Ask Your Question
1

Is it possible to add the Kafka message timestamp to the headers generated by the Kafka Multi-topic Consumer?

asked 2019-12-12 10:35:10 -0500

tmotters gravatar image

The Kafka Consumer exposes the Kafka message timestamp in the record header which is being used further down in the pipeline. We need to use the Multi-topic consumer so that we can read from multiple topics and run multiple threads, but the message timestamp is not available in the header. Is there a way to get at the Kafka message timestamp using the Kafka Multi-topic Consumer? If not can this be added as a feature request?

Many thanks,

Tim

edit retag flag offensive close merge delete

2 Answers

Sort by ยป oldest newest most voted
1

answered 2019-12-12 15:49:13 -0500

metadaddy gravatar image

This is an outstanding issue for this - SDC-11073 - but it looks like we have a pull request for it. Hopefully we can merge it into the product in the next release cycle.

edit flag offensive delete link more
0

answered 2019-12-13 05:44:18 -0500

tmotters gravatar image

Thanks for the update @metadaddy.

We have noticed that the behaviours of the single and multi-topic Kafka consumer are somewhat different. For example, if a pipeline with a single topic Kafka consumer is stopped for a number of hours - say overnight - when the pipeline is restarted, the consumer doesn't consume any records from Kafka and we don't see any values for CURRENT-OFFSET when we try to describe the consumer group with the kafka-consumer-groups ...-group streamsetsXYZLogsCollector --describe command on the broker.

When we use the Multi-topic Kafka consumer, everything seems to work as expected i.e. the consumer picks up where it left off and the CURRENT-OFFSET is displayed when running kafka-consumer-groups command.

If we run jtrace against SDC we notice that the single topic consumer mentions:

com.streamsets.pipeline.kafka.impl.BaseKafkaConsumer09$KafkaConsumerRunner.run(BaseKafkaConsumer09.java:350)

whereas the muti-topic consumer mentions:

com.streamsets.pipeline.stage.origin.multikafka.v0_10.loader.Kafka0_10ConsumerLoader$WrapperKafkaConsumer.poll(Kafka0_10ConsumerLoader.java:165)

Is there a significant difference in the SDC implementation of these two origins e.g. which version of Kafka they are implemented against?

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2019-12-12 10:35:10 -0500

Seen: 6,226 times

Last updated: Dec 13 '19