Kafka extension
Plugin ID
The plugin ID for Apache Kafka is fr.cantor.c4i.KafkaDelivery
.
Properties
Property key | Description | Corresponding Kafka configuration key | Mandatory | Default value |
---|---|---|---|---|
debug | true to display debug log | No | false | |
brokerUrls | URL of the Kafka brokers, separated by commas (,) | bootstrap.servers | Yes | |
clientId | Kafka client ID | client.id | No | |
configAssetId | The ID of the asset containing the YAML mappings | Yes | ||
prefixTopicNamesWithContextId | true or false to indicate whether or not to prefix the topics/queues names with the context IDs | Yes | ||
securityProtocol | PLAINTEXT , SSL , SASL_PLAINTEXT or SASL_SSL | security.protocol | No | PLAINTEXT |
saslMechanism | PLAIN , SCRAM-SHA-256 or OAUTHBEARER | sasl.mechanism | If securityProtocol is SASL_PLAINTEXT or SASL_SSL | |
username | The username for SASL authentication | sasl.jaas.config | No | |
password | The password for SASL authentication | sasl.jaas.config | No | |
jaas.extensions.${extensionName} | SASL extension to include in JAAS configuration file (Only used for OAUTHBEARER authentification, see below) | sasl.jaas.config | No | |
serializationMode | NONE , AVRO , PROTOBUF | value.serializer (see below) | No | NONE |
schemaPath | The path to the Avro or Protobuf schema | No | ||
schemaRegistryUrl | The URL of the Schema Registry | schema.registry.url | No | |
cacheConfig | The ID of the asset containing the JCS configuration | No | ||
topic.partition.${topicName} | The partition number for the chosen topic | No | ||
oAuth2TokenEndpoint | The endpoint to retrieve the OAuth2 token | sasl.oauthbearer.token.endpoint.url | No | |
oAuth2Scope | The OAuth2 token scope | sasl.jaas.config | No | |
compressionMode | NONE , GZIP , SNAPPY , LZ4 , or ZSTD | compression.type | No | NONE |
lingerMs | The delay in ms before sending a request to allow batching | linger.ms | No | 0 |
batchSize | The maximum batch size in bytes | batch.size | No | 0 |
maxRequestSize | The maximum uncompressed message size in bytes | max.request.size | No | |
bufferMemory | The total bytes of memory the producer can use to buffer records waiting to be sent to the server | buffer.memory | No | |
topic.mapping.${targetName} | The topic mapping for the chosen target | No | ||
autoCreateTopics | true to automatically create topics if they don't already exist, false otherwise | No | false |
Configuration example
extensions.[0].pluginId=fr.cantor.c4i.KafkaDelivery
extensions.[0].brokerUrls=tcp://localhost:9092
extensions.[0].clientId=c4i
extensions.[0].configAssetId=C4I-Configuration
extensions.[0].prefixTopicNamesWithContextId=true
extensions.[0].username=clientId
extensions.[0].securityProtocol=SASL_SSL
extensions.[0].saslMechanism=OAUTHBEARER
extensions.[0].schemaRegistryUrl=http://my-schema-registry.url
extensions.[0].topic.partition.products=2
extensions.[0].oAuth2TokenEndpoint=https://my-oauth2-endpoint.com/auth/realms/myrealm/protocol/openid-connect/token
extensions.[0].oAuth2Scope=my-custom-scope
extensions.[0].compressionMode=GZIP
extensions.[0].lingerMs=200
extensions.[0].batchSize=10000
extensions.[0].maxRequestSize=100000
pluginId=fr.cantor.c4i.KafkaDelivery
brokerUrls=tcp://localhost:9092
clientId=c4i
configAssetId=/Path/to/C4I/Configuration.yml
prefixTopicNamesWithContextId=true
licenseAssetId=/Path/to/C4I/License.cat
username=clientId
password=clientSecret
securityProtocol=SASL_SSL
saslMechanism=OAUTHBEARER
schemaRegistryUrl=http://my-schema-registry.url
cacheConfig=/Path/to/JCS/Config.ccf
topic.partition.products=2
oAuth2TokenEndpoint=https://my-oauth2-endpoint.com/auth/realms/myrealm/protocol/openid-connect/token
oAuth2Scope=my-custom-scope
compressionMode=GZIP
lingerMs=200
batchSize=10000
maxRequestSize=100000
Message ID
The fields identified as the message key will be sent as the Kafka Message ID, and will always be serialized as a String.
- If only one field is mapped, the Message ID will be this single field value.
- If multiple fields are mapped, the Message ID will be a serialized JSON document containing the key-values pairs.
Important: These fields will not be part of the produced message. If you wish to add them to the message as well, you will need to map them twice
Example
Given the following XML:
<STEP-ProductInformation ContextID="GL">
<Product ID="PRD1">
<Name>Test Product</Name>
</Product>
</STEP-ProductInformation>
The following example illustrates simple keys:
Product:
- product[id]:
- ./@ID: id
- Name/text(): name
PRD1
{
"name": "Test Product"
}
The following example illustrates complex keys:
Product:
- product[id, context]:
- ./@ID: id
- c4i:currentContextId(): context
- Name/text(): name
{"id": "PRD1", "context": "GL"}
{
"name": "Test Product"
}
The following example illustrates how to include the key in the message:
Product:
- product[messageId]:
- ./@ID: messageId
- ./@ID: id
- Name/text(): name
PRD1
{
"id": "PRD1",
"name": "Test Product"
}
Serialization and schema registry
You can provide the URL to your schema registry and your Avro or Protobuf schema. C4i will use the schema to serialize the data and send it to the schema registry.
Some useful information:
- Only values are serialized
- The root object names must correspond to the topic names (excluding context namespaces)
- JSON objects with key-value pairs can be represented with
map
structures - Date and datetime attributes are converted to the relevant data types, unless explicitly converted
Serializer classes
Serialization mode | Serializer class |
---|---|
NONE | org.apache.kafka.common.serialization.StringSerializer |
AVRO | io.confluent.kafka.serializers.KafkaAvroSerializer |
PROTOBUF | org.apache.kafka.common.serialization.ByteArraySerializer |
Example
The following YAML mappings:
- Product:
product[id]:
- ./@ID: id
- c4i:serializeValuesToJSON(.): values
- Name/text(): name
Can be serialized with the following schemas:
[
{
"namespace": "",
"name": "product",
"doc": "my product",
"type": "record",
"fields":
[
{
"name": "name",
"type": "string"
},
{
"name": "values",
"type":
{
"type": "map",
"values":
[
"string",
"double",
"int"
]
}
}
]
}
]
syntax = "proto3";
import "google/protobuf/struct.proto";
message product{
string name = 1;
map<string, google.protobuf.Value> values = 2;
}
SASL extensions to include in JAAS configuration file
When authenticating with OAuth2, you can supply SASL extensions to include in the JAAS configuration file.
Example :
jaas.extensions.extension_logicalCluster=<Cluster ID>
jaas.extensions.extension_identityPoolId=<Pool ID>
Will produce the following JAAS configuration file :
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='<client ID>' \
scope='<Requested Scope>' \
clientSecret='<Client Secret>' \
extension_logicalCluster='<Cluster ID>' \
extension_identityPoolId='<Pool ID>';
Topics mapping
You can use the same YAML file to publish to different topics.
For example, with the following YAML file :
- Product:
products[id]:
- ./@ID: id
- Name/text(): name
And the following configurations
topic.mapping.targetTopic=products.dev
topic.mapping.targetTopic=products.prod
You can use the same YAML file to publish data to two different topics.
When configuring a STEP extension, use the following format :
target1=topic1;target2=topic2```