This tutorial covers the steps for retrieving data from a customer success source and bringing them to Platform using Flow Service API.
This tutorial requires you to have a working understanding of the following components of Adobe Experience Platform:
{TENANT_ID}
, the concept of “containers”, and the required headers for making requests (with special attention to the Accept header and its possible values).For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.
You can create a source connection by making a POST request to the Flow Service API. A source connection consists of a connection ID, a path to the source data file, and a connection spec ID.
To create a source connection, you must also define an enum value for the data format attribute.
Use the following the enum values for file-based connectors:
Data format | Enum value |
---|---|
Delimited | delimited |
JSON | json |
Parquet | parquet |
For all table-based connectors, set the value tabular
.
API format
POST /sourceConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Source connection for Customer Success",
"baseConnectionId": "f1da3694-38a9-403d-9a36-9438a9203d42",
"description": "Source connection for a Customer Success connector",
"data": {
"format": "tabular",
},
"params": {
"tableName": "Account",
"columns": [
{
"name": "Id",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Name",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "Phone",
"type": "string",
"xdm": {
"type": "string"
}
},
{
"name": "CreatedDate",
"type": "string",
"meta:xdmType": "date-time",
"xdm": {
"type": "string",
"format": "date-time"
}
}
]
},
"connectionSpec": {
"id": "cb66ab34-8619-49cb-96d1-39b37ede86ea",
"version": "1.0"
}
}'
Property | Description |
---|---|
baseConnectionId |
The unique connection ID of the third-party customer success system you are accessing. |
params.path |
The path of the source file. |
connectionSpec.id |
The connection spec ID associated with your specific third-party customer success system. See the appendix for a list of connection spec IDs. |
Response
A successful response returns the unique identifier (id
) of the newly created source connection. This ID is required in later steps to create a target connection.
{
"id": "17faf955-2cf8-4b15-baf9-552cf88b1540",
"etag": "\"2900a761-0000-0200-0000-5ed18cea0000\""
}
In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained.
A target XDM schema can be created by performing a POST request to the Schema Registry API.
For detailed steps on how to create a target XDM schema, see the tutorial on creating a schema using the API.
A target dataset can be created by performing a POST request to the Catalog Service API, providing the ID of the target schema within the payload.
For detailed steps on how to create a target dataset, see the tutorial on creating a dataset using the API.
A target connection represents the connection to the destination where the ingested data lands in. To create a target connection, you must provide the fixed connection spec ID associated to the Data Lake. This connection spec ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
You now have the unique identifiers a target schema a target dataset and the connection spec ID to data lake. Using the Flow Service API, you can create a target connection by specifying these identifiers along with the dataset that will contain the inbound source data.
API format
POST /targetConnections
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a customer success connector",
"description": "Target Connection for a customer success connector",
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/deb3e1096c35d8311b5d80868c4bd5b3cdfd4b3150e7345f",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5e543e8a60b15218ad44b95f"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
Property | Description |
---|---|
data.schema.id |
The $id of the target XDM schema. |
data.schema.version |
The version of the schema. This value must be set application/vnd.adobe.xed-full+json;version=1 , which returns the latest minor version of the schema. |
params.dataSetId |
The ID of the target dataset generated in the previous step. Note: You must provide a valid dataset ID when creating a target connection. An invalid dataset ID will result in an error. |
connectionSpec.id |
The connection spec ID used to connect to the data lake. This ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Response
A successful response returns the new target connection’s unique identifier (id
). This value is required in a later step to create a dataflow.
{
"id": "1f5af99c-f1ef-4076-9af9-9cf1ef507678",
"etag": "\"530013e2-0000-0200-0000-5ebc4c110000\""
}
In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.
To create a mapping set, make a POST request to the mappingSets
endpoint of the Data Prep API while providing your target XDM schema $id
and the details of the mapping sets you want to create.
API format
POST /mappingSets
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/b750bd161fef405bc324d0c8809b02c494d73e60e7ae9b3e",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.fullName",
"sourceAttribute": "Name",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "_repo.createDate",
"sourceAttribute": "CreatedDate",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
Property | Description |
---|---|
xdmSchema |
The $id of the target XDM schema. |
Response
A successful response returns details of the newly created mapping including its unique identifier (id
). This ID is required in a later step to create a dataflow.
{
"id": "7c3547d3cfc14f568a51c32b4c0ed739",
"version": 0,
"createdDate": 1590792069173,
"modifiedDate": 1590792069173,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
A dataflow is responsible for collecting data from sources and bringing them into Platform. In order to create a dataflow, you must first obtain the dataflow specifications by performing a GET request to the Flow Service API. Dataflow specifications are responsible for collecting data from a third-party customer success system.
API format
GET /flowSpecs?property=name=="CRMToAEP"
Request
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Response
A successful response returns the details of the dataflow specification responsible for bringing data from your source into Platform. The response includes the unique flow spec id
required to create a new dataflow.
The JSON response payload below is hidden for brevity. Select “payload” to see the response payload.
{
"id": "14518937-270c-4525-bdec-c2ba7cce3860",
"name": "CRMToAEP",
"providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
"version": "1.0",
"attributes": {
"isSourceFlow": true,
"flacValidationSupported": true,
"frequency": "batch",
"notification": {
"category": "sources",
"flowRun": {
"enabled": true
}
}
},
"sourceConnectionSpecIds": [
"3416976c-a9ca-4bba-901a-1f08f66978ff",
"38ad80fe-8b06-4938-94f4-d4ee80266b07",
"d771e9c1-4f26-40dc-8617-ce58c4b53702",
"3c9b37f8-13a6-43d8-bad3-b863b941fedd",
"cc6a4487-9e91-433e-a3a3-9cf6626c1806",
"3000eb99-cd47-43f3-827c-43caf170f015",
"26d738e0-8963-47ea-aadf-c60de735468a",
"74a1c565-4e59-48d7-9d67-7c03b8a13137",
"cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
"4f63aa36-bd48-4e33-bb83-49fbcd11c708",
"cb66ab34-8619-49cb-96d1-39b37ede86ea",
"eb13cb25-47ab-407f-ba89-c0125281c563",
"1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
"37b6bf40-d318-4655-90be-5cd6f65d334b",
"a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
"221c7626-58f6-4eec-8ee2-042b0226f03b",
"a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
"6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
"aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
"8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
"ecde33f2-c56f-46cc-bdea-ad151c16cd69",
"102706fb-a5cd-42ee-afe0-bc42f017ff43",
"09182899-b429-40c9-a15a-bf3ddbc8ced7",
"0479cc14-7651-4354-b233-7480606c2ac3",
"d6b52d86-f0f8-475f-89d4-ce54c8527328",
"a8f4d393-1a6b-43f3-931f-91a16ed857f4",
"1fe283f6-9bec-11ea-bb37-0242ac130002",
"fcad62f3-09b0-41d3-be11-449d5a621b69",
"ea1c2a08-b722-11eb-8529-0242ac130003",
"35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
"ff4274f2-c9a9-11eb-b8bc-0242ac130003",
"ba5126ec-c9ac-11eb-b8bc-0242ac130003",
"b2e08744-4f1a-40ce-af30-7abac3e23cf3",
"929e4450-0237-4ed2-9404-b7e1e0a00309",
"2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
"2fa8af9c-2d1a-43ea-a253-f00a00c74412"
],
"targetConnectionSpecIds": [
"c604ff05-7f1a-43c0-8e18-33bf874cb11c"
],
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "EnterpriseSource",
"permissions": [
"write"
]
}
]
},
"optionSpec": {
"name": "OptionSpec",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"errorDiagnosticsEnabled": {
"title": "Error diagnostics.",
"description": "Flag to enable detailed and sample error diagnostics summary.",
"type": "boolean",
"default": false
},
"partialIngestionPercent": {
"title": "Partial ingestion threshold.",
"description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
"type": "number",
"exclusiveMinimum": 0
}
}
}
},
"scheduleSpec": {
"name": "PeriodicSchedule",
"type": "Periodic",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"startTime": {
"description": "epoch time",
"type": "integer"
},
"frequency": {
"type": "string",
"enum": [
"once",
"minute",
"hour",
"day",
"week"
]
},
"interval": {
"type": "integer"
},
"backfill": {
"type": "boolean",
"default": true
}
},
"required": [
"startTime",
"frequency"
],
"if": {
"properties": {
"frequency": {
"const": "once"
}
}
},
"then": {
"allOf": [
{
"not": {
"required": [
"interval"
]
}
},
{
"not": {
"required": [
"backfill"
]
}
}
]
},
"else": {
"required": [
"interval"
],
"if": {
"properties": {
"frequency": {
"const": "minute"
}
}
},
"then": {
"properties": {
"interval": {
"minimum": 15
}
}
},
"else": {
"properties": {
"interval": {
"minimum": 1
}
}
}
}
}
},
"transformationSpec": [
{
"name": "Copy",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"deltaColumn": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"deltaColumn"
]
}
},
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
},
"mappingVersion": {
"type": "string"
}
}
}
}
],
"runSpec": {
"name": "ProviderParams",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for creating flow run.",
"properties": {
"startTime": {
"type": "integer",
"description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
},
"windowStartTime": {
"type": "integer",
"description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"windowEndTime": {
"type": "integer",
"description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
},
"deltaColumn": {
"type": "object",
"description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
"properties": {
"name": {
"type": "string"
},
"dateFormat": {
"type": "string"
},
"timezone": {
"type": "string"
}
},
"required": [
"name"
]
}
},
"required": [
"startTime",
"windowStartTime",
"windowEndTime",
"deltaColumn"
]
}
}
}
Property | Description |
---|---|
flowSpec.id |
The flow spec ID retrieved in the previous step. |
sourceConnectionIds |
The source connection ID retrieved in an earlier step. |
targetConnectionIds |
The target connection ID retrieved in an earlier step. |
transformations.params.mappingId |
The mapping ID retrieved in an earlier step. |
transformations.params.deltaColum |
The designated column used to differentiate between new and existing data. Incremental data will be ingested based on the timestamp of selected column. The supported date format for deltaColumn is yyyy-MM-dd HH:mm:ss . |
transformations.params.mappingId |
The mapping ID associated with your database. |
scheduleParams.startTime |
The start time for the dataflow in epoch time. |
scheduleParams.frequency |
The frequency at which the dataflow will collect data. Acceptable values include: once , minute , hour , day , or week . |
scheduleParams.interval |
The interval designates the period between two consecutive flow runs. The interval’s value should be a non-zero integer. The minimum accepted interval value for each frequency is as follows:
|
Response
A successful response returns the ID id
of the newly created dataflow.
{
"id": "e0bd8463-0913-4ca1-bd84-6309134ca1f6",
"etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}
Once your dataflow has been created, you can monitor the data that is being ingested through it to see information on flow runs, completion status, and errors. For more information on how to monitor dataflows, see the tutorial on monitoring dataflows in the API
By following this tutorial, you have created a source connector to collect data from a customer success system on a scheduled basis. Incoming data can now be used by downstream Platform services such as Real-Time Customer Profile and Data Science Workspace. See the following documents for more details:
The following section lists the different cloud storage source connectors and their connections specifications.
Connector name | Connection spec |
---|---|
Salesforce Service Cloud | cb66ab34-8619-49cb-96d1-39b37ede86ea |
ServiceNow | eb13cb25-47ab-407f-ba89-c0125281c563 |