Kafka extension

Plugin ID

The plugin ID for Apache Kafka is fr.cantor.c4i.KafkaDelivery.

Properties

Environment independent parameters (set in extension configuration or properties file)

Property keyDescription MandatoryDefault value
configAssetIdThe ID of the asset containing the YAML mappingsYes
connectionThe name of properties file to useYes
prefixTopicNamesWithContextIdtrue or false to indicate whether or not to prefix the topics/queues names with the context IDsYes
outputFormatCSV, EXCEL, JSON, JSONL, XML or XMLLYes
autoCreateTopicstrue to automatically create topics if they don't already exist, false otherwiseNo
serializationModeNONE, AVRO, PROTOBUFNo
schemaPathThe path to the Avro or Protobuf schemaNo
topic.mapping.${targetName}The topic mapping for the chosen targetNo
cacheConfigThe ID of the asset containing the JCS configurationNo
displayPerfStattrue to display performance statistics (see Performance statistics dedicated page)No

Environment dependent parameters

In order to make the Kafka connector work, you need to connect to the Stibo SFTP for your STEP environment and create the folder /shared/customer-config/c4i/credentials/kafka (for STEP usage) or create the folder ~/.c4i/credentials/kafka on your file system (for Standalone usage).

Then you can store your Kafka connection properties in the file myConnection.properties and reference it in the extension configuration parameter "Connection name" or through the connection property for Composite or Standalone.

Configuration example

extensions.[0].pluginId=fr.cantor.c4i.KafkaDelivery
soccextensions.[0].configAssetId=C4I-Configuration
extensions.[0].prefixTopicNamesWithContextId=true
extensions.[0].topic.partition.products=2
extensions.[0].outputFormat=JSON
pluginId=fr.cantor.c4i.KafkaDelivery
configAssetId=/Path/to/C4I/Configuration.yml
prefixTopicNamesWithContextId=true
licenseAssetId=/Path/to/C4I/License.cat
cacheConfig=/Path/to/JCS/Config.ccf
topic.partition.products=2
outputFormat=JSON

Message ID

The fields identified as the message key will be sent as the Kafka Message ID, and will always be serialized as a String.

  • If only one field is mapped, the Message ID will be this single field value.
  • If multiple fields are mapped, the Message ID will be a serialized JSON document containing the key-values pairs.

Important: These fields will not be part of the produced message. If you wish to add them to the message as well, you will need to map them twice

Example

Given the following XML:

<STEP-ProductInformation ContextID="GL">
    <Product ID="PRD1">
        <Name>Test Product</Name>
    </Product>
</STEP-ProductInformation>

The following example illustrates simple keys:

Product:
  - product[id]:
      - ./@ID: id
      - Name/text(): name
PRD1
{
  "name": "Test Product"
}

The following example illustrates complex keys:

Product:
  - product[id, context]:
      - ./@ID: id
      - c4i:currentContextId(): context
      - Name/text(): name
{"id": "PRD1", "context": "GL"}
{
  "name": "Test Product"
}

The following example illustrates how to include the key in the message:

Product:
  - product[messageId]:
      - ./@ID: messageId
      - ./@ID: id
      - Name/text(): name
PRD1
{
  "id": "PRD1",
  "name": "Test Product"
}

Serialization and schema registry

You can provide the URL to your schema registry and your Avro or Protobuf schema. C4i will use the schema to serialize the data and send it to the schema registry.

Some useful information:

  • Only values are serialized
  • The root object names must correspond to the topic names (excluding context namespaces)
  • JSON objects with key-value pairs can be represented with map structures
  • Date and datetime attributes are converted to the relevant data types, unless explicitly converted

Serializer classes

Serialization modeSerializer class
NONEorg.apache.kafka.common.serialization.StringSerializer
AVROio.confluent.kafka.serializers.KafkaAvroSerializer
PROTOBUForg.apache.kafka.common.serialization.ByteArraySerializer

Example

The following YAML mappings:

- Product:
    product[id]:
        - ./@ID: id
        - c4i:serializeValuesToJSON(.): values
        - Name/text(): name

Can be serialized with the following schemas:

[
    {
        "namespace": "",
        "name": "product",
        "doc": "my product",
        "type": "record",
        "fields":
        [
            {
                "name": "name",
                "type": "string"
            },
            {
                "name": "values",
                "type":
                {
                    "type": "map",
                    "values":
                    [
                        "string",
                        "double",
                        "int"
                    ]
                }
            }
        ]
    }
]
syntax = "proto3";

import "google/protobuf/struct.proto";

message product{
  string name = 1;
  map<string, google.protobuf.Value> values = 2;
}

SASL extensions to include in JAAS configuration file

When authenticating with OAuth2, you can supply SASL extensions to include in the JAAS configuration file.

Example :

jaas.extensions.extension_logicalCluster=<Cluster ID>
jaas.extensions.extension_identityPoolId=<Pool ID>

Will produce the following JAAS configuration file :

 org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='<client ID>' \
    scope='<Requested Scope>' \
    clientSecret='<Client Secret>' \
    extension_logicalCluster='<Cluster ID>' \
    extension_identityPoolId='<Pool ID>';

Topics mapping

You can use the same YAML file to publish to different topics.

For example, with the following YAML file :

- Product:
  products[id]:
    - ./@ID: id
    - Name/text(): name

And the following configurations

topic.mapping.targetTopic=products.dev
topic.mapping.targetTopic=products.prod

You can use the same YAML file to publish data to two different topics.

When configuring a STEP extension, use the following format :

target1=topic1;target2=topic2
Last Updated: