Ask Your Question
0

How do you use SDC Kafka Consumer/Producer with Confluent Cloud?

asked 2020-03-09 21:52:02 -0500

J3 gravatar image

updated 2020-03-16 22:08:14 -0500

Trying to communicate via SSL with Confluent Cloud. Thank you

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
0

answered 2020-03-10 08:41:51 -0500

J3 gravatar image

updated 2020-03-10 18:15:41 -0500

Go to the Kafka Producer stage Kafka tab, and then click on the 'Switch to bulk edit mode' link for Kafka Configuration. Then replace whatever JSON object may exist with the following:

[
    {
        "key": "ssl.endpoint.identification.algorithm",
        "value": "https"
    },
    {
        "key": "sasl.mechanism",
        "value": "PLAIN"
    },
    {
        "key": "request.timeout.ms",
        "value": "20000"
    },
    {
        "key": "bootstrap.servers",
        "value": "<bootstrap-server-url>"
    },
    {
        "key": "retry.backoff.ms",
        "value": "500"
    },
    {
        "key": "sasl.jaas.config",
        "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<api-key>\" password=\"<api-secret>\";"
    },
    {
        "key": "security.protocol",
        "value": "SASL_SSL"
    },
    {
        "key": "basic.auth.credentials.source",
        "value": "USER_INFO"
    },
    {
        "key": "schema.registry.basic.auth.user.info",
        "value": \"<sr-api-key>:< sr-api-secret>\"
    },
    {
        "key": "schema.registry.url",
        "value": "<schema-registry-url> "
    },
    {
        "key": "key.serializer",
        "value": "io.confluent.kafka.serializers.KafkaAvroSerializer"
    },
    {
        "key": "value.serializer",
        "value": "io.confluent.kafka.serializers.KafkaAvroSerializer"
    }
]

Then start the pipeline.

Note, I got the following error when I ran my pipeline during testing because I was using AVRO as my data format:

Error evaluating the message key expression '${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}' for record '1792234_20': CTRCMN_0100 - Error evaluating expression ${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}: java.lang.NullPointerException

If you do as well, what did the trick for me was getting rid of:

${avro:decode(record:attribute('avroKeySchema'),base64:decodeBytes(record:attribute('kafkaMessageKey')))}

in the Kafka Message Key textbox.

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2020-03-09 21:52:02 -0500

Seen: 201 times

Last updated: Mar 10