How to write kafka producer with dynamic generated schema?
I am using SDC 3.1.2.0 to ingest data from kafka consumer with avro data registered with confluent schema registry, then do some filter, lookup and pivot action and finally sink data into another kafka topic with avro data type. I configed kafka producer to use confluent schema registry, providing registry url and lookup schema by subject, but when it run it complains the schema not existed. How can I send dynamic generated schema kafka message by kafka producer component? For short, avro schema A in and avro schema B out.
The error message is:
Kafka Producer 1 DATA_FORMAT_201 - Cannot create the parser factory: java.lang.RuntimeException: Could not create DataFactory instance for 'com.streamsets.pipeline.lib.generator.avro.AvroDataGeneratorFactory': com.streamsets.pipeline.lib.util.SchemaRegistryException: com.streamsets.pipeline.lib.util.SchemaRegistryException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
can you post some more information?
I added screenshots and error message above. For short, I want kafka messages with avro schema A in and kafka messages with avro schema B out.
Outside of StreamSets, what are the results of attempting to access the registry directy? For example `curl -X GET http://localhost:8081/subjects`
There is no schema generated yet, it raised error when I check & preview. I just want to know how to consume existing avro kafka message, do some transformations and then sink those records to another kafka topic with deferred avro schema (I suppose the sink schema can be deferred by sdc, right?)
I understand now. You'd like to register the schema if it doesn't exist, right?