Ask Your Question
0

How to write kafka producer with dynamic generated schema?

asked 2018-04-08 05:06:04 -0500

casel.chen gravatar image

updated 2018-04-09 20:05:13 -0500

I am using SDC 3.1.2.0 to ingest data from kafka consumer with avro data registered with confluent schema registry, then do some filter, lookup and pivot action and finally sink data into another kafka topic with avro data type. I configed kafka producer to use confluent schema registry, providing registry url and lookup schema by subject, but when it run it complains the schema not existed. How can I send dynamic generated schema kafka message by kafka producer component? For short, avro schema A in and avro schema B out.

image description

image description

The error message is:

Kafka Producer 1 DATA_FORMAT_201 - Cannot create the parser factory: java.lang.RuntimeException: Could not create DataFactory instance for 'com.streamsets.pipeline.lib.generator.avro.AvroDataGeneratorFactory': com.streamsets.pipeline.lib.util.SchemaRegistryException: com.streamsets.pipeline.lib.util.SchemaRegistryException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401

edit retag flag offensive close merge delete

Comments

can you post some more information?

tmcgrath gravatar imagetmcgrath ( 2018-04-09 17:00:35 -0500 )edit

I added screenshots and error message above. For short, I want kafka messages with avro schema A in and kafka messages with avro schema B out.

casel.chen gravatar imagecasel.chen ( 2018-04-09 20:07:08 -0500 )edit

Outside of StreamSets, what are the results of attempting to access the registry directy? For example `curl -X GET http://localhost:8081/subjects`

tmcgrath gravatar imagetmcgrath ( 2018-04-10 06:51:40 -0500 )edit

There is no schema generated yet, it raised error when I check & preview. I just want to know how to consume existing avro kafka message, do some transformations and then sink those records to another kafka topic with deferred avro schema (I suppose the sink schema can be deferred by sdc, right?)

casel.chen gravatar imagecasel.chen ( 2018-04-10 06:57:35 -0500 )edit

I understand now. You'd like to register the schema if it doesn't exist, right?

tmcgrath gravatar imagetmcgrath ( 2018-04-10 07:20:18 -0500 )edit

1 Answer

Sort by ยป oldest newest most voted
1

answered 2018-04-10 14:09:22 -0500

tmcgrath gravatar image

Try adding the Schema Generator processor and then update your Kafka producer to register the schema as shown in this screenshot

image description

Also, good reference https://streamsets.com/blog/generate-...

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2018-04-08 05:06:04 -0500

Seen: 31 times

Last updated: Apr 10