Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Hi!

Here's one way of handling it. Add Jython (Groovy, or JavaScript) Evaluator in your pipeline and use state object to track and check if it's the first or subsequent batch, and if there are any records in the batch. If it's not the first batch and there aren't any records in the batch, generate a custom event and send it to Pipeline Finisher.

Note: You will need to play with Batch Wait Time (ms) and Max Batch Size (records) depending on your use case and how quickly messages are being produced, but provided you get that nailed, this will work.


Init Script: state['first_batch'] = "true"

Script:

if (state['first_batch'] == "false" and len(records) == 0):
  sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
  sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
  try:
    sdc.output.write(record)
  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))

if (state['first_batch'] == "true" and len(records) > 0):
  state['first_batch'] = "false"

image description


image description


image description


Cheers, Dash