Kafka extension

Plugin ID

The plugin ID for Apache Kafka is fr.cantor.c4i.KafkaDelivery.

Properties

Property keyDescriptionCorresponding Kafka configuration keyMandatoryDefault value
debugtrue to display debug logNofalse
brokerUrlsURL of the Kafka brokers, separated by commas (,)bootstrap.serversYes
clientIdKafka client IDclient.idNo
configAssetIdThe ID of the asset containing the YAML mappingsYes
prefixTopicNamesWithContextIdtrue or false to indicate whether or not to prefix the topics/queues names with the context IDsYes
securityProtocolPLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSLsecurity.protocolNoPLAINTEXT
saslMechanismPLAIN, SCRAM-SHA-256 or OAUTHBEARERsasl.mechanismIf securityProtocol is SASL_PLAINTEXT or SASL_SSL
usernameThe username for SASL authenticationsasl.jaas.configNo
passwordThe password for SASL authenticationsasl.jaas.configNo
jaas.extensions.${extensionName}SASL extension to include in JAAS configuration file (Only used for OAUTHBEARER authentification, see below)sasl.jaas.configNo
serializationModeNONE, AVRO, PROTOBUFvalue.serializer (see below)NoNONE
schemaPathThe path to the Avro or Protobuf schemaNo
schemaRegistryUrlThe URL of the Schema Registryschema.registry.urlNo
cacheConfigThe ID of the asset containing the JCS configurationNo
topic.partition.${topicName}The partition number for the chosen topicNo
oAuth2TokenEndpointThe endpoint to retrieve the OAuth2 tokensasl.oauthbearer.token.endpoint.urlNo
oAuth2ScopeThe OAuth2 token scopesasl.jaas.configNo
compressionModeNONE, GZIP, SNAPPY, LZ4, or ZSTDcompression.typeNoNONE
lingerMsThe delay in ms before sending a request to allow batchinglinger.msNo0
batchSizeThe maximum batch size in bytesbatch.sizeNo0
maxRequestSizeThe maximum uncompressed message size in bytesmax.request.sizeNo
bufferMemoryThe total bytes of memory the producer can use to buffer records waiting to be sent to the serverbuffer.memoryNo
topic.mapping.${targetName}The topic mapping for the chosen targetNo
autoCreateTopicstrue to automatically create topics if they don't already exist, false otherwiseNofalse

Configuration example

extensions.[0].pluginId=fr.cantor.c4i.KafkaDelivery
extensions.[0].brokerUrls=tcp://localhost:9092
extensions.[0].clientId=c4i
extensions.[0].configAssetId=C4I-Configuration
extensions.[0].prefixTopicNamesWithContextId=true
extensions.[0].username=clientId
extensions.[0].securityProtocol=SASL_SSL
extensions.[0].saslMechanism=OAUTHBEARER
extensions.[0].schemaRegistryUrl=http://my-schema-registry.url
extensions.[0].topic.partition.products=2
extensions.[0].oAuth2TokenEndpoint=https://my-oauth2-endpoint.com/auth/realms/myrealm/protocol/openid-connect/token
extensions.[0].oAuth2Scope=my-custom-scope
extensions.[0].compressionMode=GZIP
extensions.[0].lingerMs=200
extensions.[0].batchSize=10000
extensions.[0].maxRequestSize=100000
pluginId=fr.cantor.c4i.KafkaDelivery
brokerUrls=tcp://localhost:9092
clientId=c4i
configAssetId=/Path/to/C4I/Configuration.yml
prefixTopicNamesWithContextId=true
licenseAssetId=/Path/to/C4I/License.cat
username=clientId
password=clientSecret
securityProtocol=SASL_SSL
saslMechanism=OAUTHBEARER
schemaRegistryUrl=http://my-schema-registry.url
cacheConfig=/Path/to/JCS/Config.ccf
topic.partition.products=2
oAuth2TokenEndpoint=https://my-oauth2-endpoint.com/auth/realms/myrealm/protocol/openid-connect/token
oAuth2Scope=my-custom-scope
compressionMode=GZIP
lingerMs=200
batchSize=10000
maxRequestSize=100000

Message ID

The fields identified as the message key will be sent as the Kafka Message ID, and will always be serialized as a String.

  • If only one field is mapped, the Message ID will be this single field value.
  • If multiple fields are mapped, the Message ID will be a serialized JSON document containing the key-values pairs.

Important: These fields will not be part of the produced message. If you wish to add them to the message as well, you will need to map them twice

Example

Given the following XML:

<STEP-ProductInformation ContextID="GL">
    <Product ID="PRD1">
        <Name>Test Product</Name>
    </Product>
</STEP-ProductInformation>

The following example illustrates simple keys:

Product:
  - product[id]:
      - ./@ID: id
      - Name/text(): name
PRD1
{
  "name": "Test Product"
}

The following example illustrates complex keys:

Product:
  - product[id, context]:
      - ./@ID: id
      - c4i:currentContextId(): context
      - Name/text(): name
{"id": "PRD1", "context": "GL"}
{
  "name": "Test Product"
}

The following example illustrates how to include the key in the message:

Product:
  - product[messageId]:
      - ./@ID: messageId
      - ./@ID: id
      - Name/text(): name
PRD1
{
  "id": "PRD1",
  "name": "Test Product"
}

Serialization and schema registry

You can provide the URL to your schema registry and your Avro or Protobuf schema. C4i will use the schema to serialize the data and send it to the schema registry.

Some useful information:

  • Only values are serialized
  • The root object names must correspond to the topic names (excluding context namespaces)
  • JSON objects with key-value pairs can be represented with map structures
  • Date and datetime attributes are converted to the relevant data types, unless explicitly converted

Serializer classes

Serialization modeSerializer class
NONEorg.apache.kafka.common.serialization.StringSerializer
AVROio.confluent.kafka.serializers.KafkaAvroSerializer
PROTOBUForg.apache.kafka.common.serialization.ByteArraySerializer

Example

The following YAML mappings:

- Product:
    product[id]:
        - ./@ID: id
        - c4i:serializeValuesToJSON(.): values
        - Name/text(): name

Can be serialized with the following schemas:

[
    {
        "namespace": "",
        "name": "product",
        "doc": "my product",
        "type": "record",
        "fields":
        [
            {
                "name": "name",
                "type": "string"
            },
            {
                "name": "values",
                "type":
                {
                    "type": "map",
                    "values":
                    [
                        "string",
                        "double",
                        "int"
                    ]
                }
            }
        ]
    }
]
syntax = "proto3";

import "google/protobuf/struct.proto";

message product{
  string name = 1;
  map<string, google.protobuf.Value> values = 2;
}

SASL extensions to include in JAAS configuration file

When authenticating with OAuth2, you can supply SASL extensions to include in the JAAS configuration file.

Example :

jaas.extensions.extension_logicalCluster=<Cluster ID>
jaas.extensions.extension_identityPoolId=<Pool ID>

Will produce the following JAAS configuration file :

 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='<client ID>' \
    scope='<Requested Scope>' \
    clientSecret='<Client Secret>' \
    extension_logicalCluster='<Cluster ID>' \
    extension_identityPoolId='<Pool ID>';

Topics mapping

You can use the same YAML file to publish to different topics.

For example, with the following YAML file :

- Product:
  products[id]:
    - ./@ID: id
    - Name/text(): name

And the following configurations

topic.mapping.targetTopic=products.dev
topic.mapping.targetTopic=products.prod

You can use the same YAML file to publish data to two different topics.

When configuring a STEP extension, use the following format :

target1=topic1;target2=topic2```
Last Updated: