Ask Your Question

How to write kafka producer with dynamic generated schema?

asked 2018-04-08 05:06:04 -0500

casel.chen gravatar image

updated 2018-04-09 20:05:13 -0500

I am using SDC to ingest data from kafka consumer with avro data registered with confluent schema registry, then do some filter, lookup and pivot action and finally sink data into another kafka topic with avro data type. I configed kafka producer to use confluent schema registry, providing registry url and lookup schema by subject, but when it run it complains the schema not existed. How can I send dynamic generated schema kafka message by kafka producer component? For short, avro schema A in and avro schema B out.

image description

image description

The error message is:

Kafka Producer 1 DATA_FORMAT_201 - Cannot create the parser factory: java.lang.RuntimeException: Could not create DataFactory instance for 'com.streamsets.pipeline.lib.generator.avro.AvroDataGeneratorFactory': com.streamsets.pipeline.lib.util.SchemaRegistryException: com.streamsets.pipeline.lib.util.SchemaRegistryException: Subject not found.; error code: 40401

edit retag flag offensive close merge delete


can you post some more information?

todd gravatar imagetodd ( 2018-04-09 17:00:35 -0500 )edit

I added screenshots and error message above. For short, I want kafka messages with avro schema A in and kafka messages with avro schema B out.

casel.chen gravatar imagecasel.chen ( 2018-04-09 20:07:08 -0500 )edit

Outside of StreamSets, what are the results of attempting to access the registry directy? For example `curl -X GET http://localhost:8081/subjects`

todd gravatar imagetodd ( 2018-04-10 06:51:40 -0500 )edit

There is no schema generated yet, it raised error when I check & preview. I just want to know how to consume existing avro kafka message, do some transformations and then sink those records to another kafka topic with deferred avro schema (I suppose the sink schema can be deferred by sdc, right?)

casel.chen gravatar imagecasel.chen ( 2018-04-10 06:57:35 -0500 )edit

I understand now. You'd like to register the schema if it doesn't exist, right?

todd gravatar imagetodd ( 2018-04-10 07:20:18 -0500 )edit

1 Answer

Sort by ยป oldest newest most voted

answered 2018-04-10 14:09:22 -0500

todd gravatar image

Try adding the Schema Generator processor and then update your Kafka producer to register the schema as shown in this screenshot

image description

Also, good reference

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower


Asked: 2018-04-08 05:06:04 -0500

Seen: 1,180 times

Last updated: Apr 10 '18