This tutorial covers the steps for retrieving raw data from a streaming source connector and bringing them to Experience Platform using the Flow Service API.
This tutorial requires you to have a working understanding of the following components of Adobe Experience Platform:
{TENANT_ID}
, the concept of “containers”, and the required headers for making requests (with special attention to the Accept header and its possible values).For information on how to successfully make calls to Platform APIs, see the guide on getting started with Platform APIs.
This tutorial also requires you to have a valid source connection ID for a streaming connector. If you do not have this information, see the following tutorials on create a streaming source connection before attempting this tutorial:
In order for the source data to be used in Platform, a target schema must be created to structure the source data according to your needs. The target schema is then used to create a Platform dataset in which the source data is contained. This target XDM schema also extends the XDM Individual Profile class.
To create a target XDM schema, make a POST request to the /schemas
endpoint of the Schema Registry API.
API format
POST /tenant/schemas
Request
The following example request creates an XDM schema that extends the XDM Individual Profile class.
curl -X POST \
'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"type": "object",
"title": "Sample schema for a streaming connector",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
}
],
"meta:containerId": "tenant",
"meta:resourceType": "schemas",
"meta:xdmType": "object",
"meta:class": "https://ns.adobe.com/xdm/context/profile"
}'
Response
A successful response returns details of the newly created schema including its unique identifier ($id
). This ID is required in later steps to create a target dataset, mapping, and dataflow.
{
"$id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:altId": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"meta:resourceType": "schemas",
"version": "1.0",
"title": "Sample schema for a streaming connector",
"type": "object",
"description": "Sample schema for a streaming connector",
"allOf": [
{
"$ref": "https://ns.adobe.com/xdm/context/profile",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
"type": "object",
"meta:xdmType": "object"
},
{
"$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
"type": "object",
"meta:xdmType": "object"
}
],
"refs": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/context/profile"
],
"imsOrg": "{ORG_ID}",
"meta:extensible": false,
"meta:abstract": false,
"meta:extends": [
"https://ns.adobe.com/xdm/context/profile-person-details",
"https://ns.adobe.com/xdm/context/profile-personal-details",
"https://ns.adobe.com/xdm/common/auditable",
"https://ns.adobe.com/xdm/data/record",
"https://ns.adobe.com/xdm/context/profile"
],
"meta:xdmType": "object",
"meta:registryMetadata": {
"repo:createdDate": 1604960074752,
"repo:lastModifiedDate": 1604960074752,
"xdm:createdClientId": "{CREATED_CLIENT_ID}",
"xdm:lastModifiedClientId": "{MODIFIED_CLIENT_ID}",
"xdm:createdUserId": "{CREATED_USER_ID}",
"xdm:lastModifiedUserId": "{MODIFIED_USER_ID}",
"eTag": "8522a151effd974429518ed90c3eaf6efc9bf6ffb6644087a85c6d4455dcd045",
"meta:globalLibVersion": "1.16.1"
},
"meta:class": "https://ns.adobe.com/xdm/context/profile",
"meta:containerId": "tenant",
"meta:sandboxId": "{SANDBOX_ID}",
"meta:sandboxType": "production",
"meta:tenantNamespace": "_{TENANT_ID}"
}
With a target XDM schema created and its unique $id
you can now create a target dataset to contain your source data. To create a target dataset, make a POST request to the dataSets
endpoint of the Catalog Service API, while providing the ID of the target schema within the payload.
API format
POST /catalog/dataSets
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Test streaming dataset",
"schemaRef": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
},
"tags": {
"identity": [
"enabled:true"
],
"profile": [
"enabled:true"
]
}
}'
Property | Description |
---|---|
name |
The name of the dataset to be created. |
schemaRef.id |
The URI $id for the XDM schema the dataset will be based on. |
schemaRef.contentType |
The version of the schema. This value must be set to application/vnd.adobe.xed-full-notext+json;version=1 , which returns the latest minor version of the schema. See the section on schema versioning in the XDM API guide for more information. |
Response
A successful response returns an array containing the ID of the newly created dataset in the format "@/datasets/{DATASET_ID}"
. The dataset ID is a read-only, system-generated string that is used to reference the dataset in API calls. The target dataset ID is required in later steps to create a target connection and a dataflow.
[
"@/dataSets/5f7187bac6d00f194fb937c0"
]
Target connections create and manage a destination connection to Platform or any location where the transferred data will land. Target connections contain information regarding data destination, data format, and the target connection ID required to create a dataflow. Target connection instances are specific to a tenant and organization.
To create a target connection, make a POST request to the /targetConnections
endpoint of the Flow Service API. As part of the request, you must provide the data format, the dataSetId
retrieved in the previous step, and the fixed connection specification ID tied to Data Lake. This ID is c604ff05-7f1a-43c0-8e18-33bf874cb11c
.
API format
POST /targetConnections
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming target connection",
"description": "Streaming target connection",
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
},
"data": {
"format": "parquet_xdm",
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f7187bac6d00f194fb937c0"
}
}'
Property | Description |
---|---|
data.format |
The specified format of the data you are bringing to data lake. |
params.dataSetId |
The ID of the target dataset generated in the previous step. Note: You must provide a valid dataset ID when creating a target connection. An invalid dataset ID will result in an error. |
connectionSpec.id |
The connection spec ID used to connect to the data lake. This ID is: c604ff05-7f1a-43c0-8e18-33bf874cb11c . |
Response
A successful response returns the new target connection’s unique identifier (id
). This ID is required in later steps.
{
"id": "d9300194-6a82-4163-b001-946a821163b8",
"etag": "\"4006d3e4-0000-0200-0000-5f7189220000\""
}
In order for the source data to be ingested into a target dataset, it must first be mapped to the target schema that the target dataset adheres to.
To create a mapping set, make a POST request to the mappingSets
endpoint of the Data Prep API while providing your target XDM schema $id
and the details of the mapping sets you want to create.
API format
POST /mappingSets
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "_{TENANT_ID}.schemas.e45dd983026ce0daec5185cfddd48cbc0509015d880d6186",
"xdmVersion": "1.0",
"mappings": [
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "firstName",
"identity": false,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "lastName",
"identity": false,
"version": 0
}
]
}'
Property | Description |
---|---|
xdmSchema |
The $id of the target XDM schema. |
Response
A successful response returns details of the newly created mapping including its unique identifier (id
). This ID is required in a later step to create a dataflow.
{
"id": "380b032b445a46008e77585e046efe5e",
"version": 0,
"createdDate": 1604960750613,
"modifiedDate": 1604960750613,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
A dataflow is responsible for collecting data from sources and bringing them into Platform. In order to create a dataflow, you must first obtain the dataflow specifications by performing a GET request to the Flow Service API.
API format
GET /flowSpecs
Request
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
Response
A successful response returns a list of dataflow specifications. The dataflow specification ID that you need to retrieve to create a dataflow using any of Amazon Kinesis, Azure Event Hubs, or Google PubSub, is d69717ba-71b4-4313-b654-49f9cf126d7a
.
{
"items": [
{
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"name": "Stream data with optional transformation",
"providerId": "521eee4d-8cbe-4906-bb48-fb6bd4450033",
"version": "1.0",
"sourceConnectionSpecIds": [
"bc7b00d6-623a-4dfc-9fdb-f1240aeadaeb",
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"86043421-563b-46ec-8e6c-e23184711bf6",
"70116022-a743-464a-bbfe-e226a7f8210c"
],
"targetConnectionSpecIds": [
"bf9f5905-92b7-48bf-bf20-455bc6b60a4e",
"c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"db4fe783-ef79-4a12-bda9-32b2b1bc3b2c"
],
"transformationSpecs": [
{
"name": "Mapping",
"spec": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"description": "defines various params required for different mapping from source to target",
"properties": {
"mappingId": {
"type": "string"
}
}
}
}
],
"attributes": {
"uiAttributes": {
"apiFeatures": {
"flowRunsSupported": false
}
}
},
"permissionsInfo": {
"view": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"read"
]
}
],
"manage": [
{
"@type": "lowLevel",
"name": "StreamingSource",
"permissions": [
"write"
]
}
]
}
},
]
}
The last step towards collecting streaming data is to create a dataflow. By now, you have the following required values prepared:
A dataflow is responsible for scheduling and collecting data from a source. You can create a dataflow by performing a POST request while providing the previously mentioned values within the payload.
API format
POST /flows
Request
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Streaming dataflow",
"description": "Streaming dataflow",
"flowSpec": {
"id": "d69717ba-71b4-4313-b654-49f9cf126d7a",
"version": "1.0"
},
"sourceConnectionIds": [
"e96d6135-4b50-446e-922c-6dd66672b6b2"
],
"targetConnectionIds": [
"723222e2-6ab9-4b0b-b222-e26ab9bb0bc2"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "380b032b445a46008e77585e046efe5e",
"mappingVersion": 0
}
}
]
}'
Property | Description |
---|---|
flowSpec.id |
The flow spec ID retrieved in the previous step. |
sourceConnectionIds |
The source connection ID retrieved in an earlier step. |
targetConnectionIds |
The target connection ID retrieved in an earlier step. |
transformations.params.mappingId |
The mapping ID retrieved in an earlier step. |
Response
A successful response returns the ID (id
) of the newly created dataflow.
{
"id": "1f086c23-2ea8-4d06-886c-232ea8bd061d",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
View the sample payload below for examples of raw or XDM-compliant json that you can send for ingestion.
You must add a delay of at least ~5 minutes between creation of dataflow and ingesting any streaming data. This allows the dataflow to be fully enabled, before any data is ingested.
The following examples apply to all of:
'{
"name": "Johnson Smith",
"location": {
"city": "Seattle",
"country": "United State of America",
"address": "3692 Main Street"
},
"gender": "Male",
"birthday": {
"year": 1984,
"month": 6,
"day": 9
}
}'
{
"header": {
"schemaRef": {
"id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
}
},
"body": {
"xdmMeta": {
"schemaRef": {
"id": "https://ns.adobe.com/aepstreamingservicesint/schemas/73cae7e6db06ebca535cd973e3ece85e66253962f504e7d8",
"contentType": "application/vnd.adobe.xed-full-notext+json; version=1.0"
}
},
"xdmEntity": {
"_id": "acme",
"workEmail": {
"address": "mike@acme.com",
"primary": true,
"type": "work",
"status": "active"
},
"person": {
"gender": "male",
"name": {
"firstName": "Mike",
"lastName": "Wazowski"
},
"birthDate": "1985-01-01"
},
"identityMap": {
"ecid": [
{
"id": "01262118050522051420082102000000000000"
}
]
}
}
}
}
By following this tutorial, you have created a dataflow to collect streaming data from your streaming connector. Incoming data can now be used by downstream Platform services such as Real-Time Customer Profile and Data Science Workspace. See the following documents for more details: