Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Figured it out!

Go to the Kafka Producer stage Kafka tab, and then click on the 'Switch to bulk edit mode' link for Kafka Configuration. Then replace whatever JSON object may exist with the following:

[
    {
        "key": "ssl.endpoint.identification.algorithm",
        "value": "https"
    },
    {
        "key": "sasl.mechanism",
        "value": "PLAIN"
    },
    {
        "key": "request.timeout.ms",
        "value": "20000"
    },
    {
        "key": "bootstrap.servers",
        "value": "<bootstrap-server-url>"
    },
    {
        "key": "retry.backoff.ms",
        "value": "500"
    },
    {
        "key": "sasl.jaas.config",
        "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";"
    },
    {
        "key": "security.protocol",
        "value": "SASL_SSL"
    }
]

Then start the pipeline.

Figured it out!

Go to the Kafka Producer stage Kafka tab, and then click on the 'Switch to bulk edit mode' link for Kafka Configuration. Then replace whatever JSON object may exist with the following: following:

[
    {
        "key": "ssl.endpoint.identification.algorithm",
        "value": "https"
    },
    {
        "key": "sasl.mechanism",
        "value": "PLAIN"
    },
    {
        "key": "request.timeout.ms",
        "value": "20000"
    },
    {
        "key": "bootstrap.servers",
        "value": "<bootstrap-server-url>"
    },
    {
        "key": "retry.backoff.ms",
        "value": "500"
    },
    {
        "key": "sasl.jaas.config",
        "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";"
    },
    {
        "key": "security.protocol",
        "value": "SASL_SSL"
    },
    {
        "key": "basic.auth.credentials.source",
        "value": "USER_INFO"
    },
    {
        "key": "schema.registry.basic.auth.user.info",
        "value": \"<sr-api-key>:< sr-api-secret>\"
    },
    {
        "key": "schema.registry.url",
        "value": "<schema-registry-url> "
    },
    {
        "key": "key.serializer",
        "value": "io.confluent.kafka.serializers.KafkaAvroSerializer"
    },
    {
        "key": "value.serializer",
        "value": "io.confluent.kafka.serializers.KafkaAvroSerializer"
    }
]

Then start the pipeline.

Note, I got the following error when I ran my pipeline during testing because I was using AVRO as my data format:

Error evaluating the message key expression '${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}' for record '1792234_20': CTRCMN_0100 - Error evaluating expression ${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}: java.lang.NullPointerException

If you do as well, what did the trick for me was getting rid of:

${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}

in the Kafka Message Key textbox.