Kafka extension
Plugin ID
The plugin ID for Apache Kafka is fr.cantor.c4i.KafkaDelivery.
Properties
Environment independent parameters (set in extension configuration or properties file)
| Property key | Description Mandatory | Default value |
|---|---|---|
| configAssetId | The ID of the asset containing the YAML mappings | Yes |
| connection | The name of properties file to use | Yes |
| prefixTopicNamesWithContextId | true or false to indicate whether or not to prefix the topics/queues names with the context IDs | Yes |
| outputFormat | CSV, EXCEL, JSON, JSONL, XML or XMLL | Yes |
| autoCreateTopics | true to automatically create topics if they don't already exist, false otherwise | No |
| serializationMode | NONE, AVRO, PROTOBUF | No |
| schemaPath | The path to the Avro or Protobuf schema | No |
| topic.mapping.${targetName} | The topic mapping for the chosen target | No |
| cacheConfig | The ID of the asset containing the JCS configuration | No |
| displayPerfStat | true to display performance statistics (see Performance statistics dedicated page) | No |
Environment dependent parameters
In order to make the Kafka connector work, you need to connect to the Stibo SFTP for your STEP environment and create the folder /shared/customer-config/c4i/credentials/kafka (for STEP usage) or create the folder ~/.c4i/credentials/kafka on your file system (for Standalone usage).
Then you can store your Kafka connection properties in the file myConnection.properties and reference it in the extension configuration parameter "Connection name" or through the connection property for Composite or Standalone.
Configuration example
extensions.[0].pluginId=fr.cantor.c4i.KafkaDelivery
soccextensions.[0].configAssetId=C4I-Configuration
extensions.[0].prefixTopicNamesWithContextId=true
extensions.[0].topic.partition.products=2
extensions.[0].outputFormat=JSON
pluginId=fr.cantor.c4i.KafkaDelivery
configAssetId=/Path/to/C4I/Configuration.yml
prefixTopicNamesWithContextId=true
licenseAssetId=/Path/to/C4I/License.cat
cacheConfig=/Path/to/JCS/Config.ccf
topic.partition.products=2
outputFormat=JSON
Message ID
The fields identified as the message key will be sent as the Kafka Message ID, and will always be serialized as a String.
- If only one field is mapped, the Message ID will be this single field value.
- If multiple fields are mapped, the Message ID will be a serialized JSON document containing the key-values pairs.
Important: These fields will not be part of the produced message. If you wish to add them to the message as well, you will need to map them twice
Example
Given the following XML:
<STEP-ProductInformation ContextID="GL">
<Product ID="PRD1">
<Name>Test Product</Name>
</Product>
</STEP-ProductInformation>
The following example illustrates simple keys:
Product:
- product[id]:
- ./@ID: id
- Name/text(): name
PRD1
{
"name": "Test Product"
}
The following example illustrates complex keys:
Product:
- product[id, context]:
- ./@ID: id
- c4i:currentContextId(): context
- Name/text(): name
{"id": "PRD1", "context": "GL"}
{
"name": "Test Product"
}
The following example illustrates how to include the key in the message:
Product:
- product[messageId]:
- ./@ID: messageId
- ./@ID: id
- Name/text(): name
PRD1
{
"id": "PRD1",
"name": "Test Product"
}
Serialization and schema registry
You can provide the URL to your schema registry and your Avro or Protobuf schema. C4i will use the schema to serialize the data and send it to the schema registry.
Some useful information:
- Only values are serialized
- The root object names must correspond to the topic names (excluding context namespaces)
- JSON objects with key-value pairs can be represented with
mapstructures - Date and datetime attributes are converted to the relevant data types, unless explicitly converted
Serializer classes
| Serialization mode | Serializer class |
|---|---|
NONE | org.apache.kafka.common.serialization.StringSerializer |
AVRO | io.confluent.kafka.serializers.KafkaAvroSerializer |
PROTOBUF | org.apache.kafka.common.serialization.ByteArraySerializer |
Example
The following YAML mappings:
- Product:
product[id]:
- ./@ID: id
- c4i:serializeValuesToJSON(.): values
- Name/text(): name
Can be serialized with the following schemas:
[
{
"namespace": "",
"name": "product",
"doc": "my product",
"type": "record",
"fields":
[
{
"name": "name",
"type": "string"
},
{
"name": "values",
"type":
{
"type": "map",
"values":
[
"string",
"double",
"int"
]
}
}
]
}
]
syntax = "proto3";
import "google/protobuf/struct.proto";
message product{
string name = 1;
map<string, google.protobuf.Value> values = 2;
}
SASL extensions to include in JAAS configuration file
When authenticating with OAuth2, you can supply SASL extensions to include in the JAAS configuration file.
Example :
jaas.extensions.extension_logicalCluster=<Cluster ID>
jaas.extensions.extension_identityPoolId=<Pool ID>
Will produce the following JAAS configuration file :
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='<client ID>' \
scope='<Requested Scope>' \
clientSecret='<Client Secret>' \
extension_logicalCluster='<Cluster ID>' \
extension_identityPoolId='<Pool ID>';
Topics mapping
You can use the same YAML file to publish to different topics.
For example, with the following YAML file :
- Product:
products[id]:
- ./@ID: id
- Name/text(): name
And the following configurations
topic.mapping.targetTopic=products.dev
topic.mapping.targetTopic=products.prod
You can use the same YAML file to publish data to two different topics.
When configuring a STEP extension, use the following format :
target1=topic1;target2=topic2