Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

How to write kafka producer with dynamic generated schema?

I am using SDC 3.1.2.0 to ingest data from kafka consumer with avro data registered with confluent schema registry, then do some filter, lookup and pivot action and finally sink data into another kafka topic with avro data type. I configed kafka producer to use confluent schema registry, providing registry url and lookup schema by subject, but when it run it complains the schema not existed. How can I send dynamic generated schema kafka message by kafka producer component?

How to write kafka producer with dynamic generated schema?

I am using SDC 3.1.2.0 to ingest data from kafka consumer with avro data registered with confluent schema registry, then do some filter, lookup and pivot action and finally sink data into another kafka topic with avro data type. I configed kafka producer to use confluent schema registry, providing registry url and lookup schema by subject, but when it run it complains the schema not existed. How can I send dynamic generated schema kafka message by kafka producer component?component? For short, avro schema A in and avro schema B out.

image description

image description

The error message is:

Kafka Producer 1 DATA_FORMAT_201 - Cannot create the parser factory: java.lang.RuntimeException: Could not create DataFactory instance for 'com.streamsets.pipeline.lib.generator.avro.AvroDataGeneratorFactory': com.streamsets.pipeline.lib.util.SchemaRegistryException: com.streamsets.pipeline.lib.util.SchemaRegistryException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401