Ask Your Question
0

How to transition a StreamSets pipeline to Finished state, after it has processed all records, if the Origin does not 'Produce Events'?

asked 2020-05-14 16:24:49 -0500

SB gravatar image

I have created a StreamSets pipeline where the Origin is 'Kafka Consumer' and the destination is 'JDBC Producer'. To run this pipeline, I have created a StreamSets Job.

After I click on 'Start Job' to run the pipeline, the Job status turns to 'Active' and remains in 'Active' state indefinitely even after all the data from the origin Kafka Topic is consumed and processed by inserting in the destination database.

I am trying to get the StreamSets Job to 'InActive' state once it finishes processing all the data in the Kafka topic.

For my other pipelines (which has the option to 'Produce Events' when no-more-data is there), I have used a 'Pipeline Finisher Executor'. But a 'Kafka Consumer' Origin does not 'Produce Event' by default.

To transition this pipeline to Finished state, I have tried the following options one by one without success: 1. Set the 'Batch Wait Time' in 'Kafka Consumer' to a lower value. 2. Set the value as -1 for 'Runner Idle Time (sec)' in General tab of the pipeline. 3. Set the value for 'Pipeline Force Stop' timeout in the 'Job Status' tab of the StreamSets Job.

Please advice how I could take the pipeline to Finished state instead of streaming continuously.

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
1

answered 2020-05-14 18:06:19 -0500

iamontheinet gravatar image

Hi!

Here's one way of handling it. Add Jython (Groovy, or JavaScript) Evaluator in your pipeline and use state object to track and check if it's the first or subsequent batch, and if there are any records in the batch. If it's not the first batch and there aren't any records in the batch, generate a custom event and send it to Pipeline Finisher.

Note: You will need to play with Batch Wait Time (ms) and Max Batch Size (records) depending on your use case and how quickly messages are being produced, but provided you get that nailed, this will work.


Init Script: state['first_batch'] = "true"

Script:

if (state['first_batch'] == "false" and len(records) == 0):
  sdc.log.info("No more Kafka messages to consume. Stopping pipeline. See ya!")
  sdc.toEvent(sdc.createEvent("no-more-messages", 0))

for record in sdc.records:
  try:
    sdc.output.write(record)
  except Exception as e:
    # Send record to error
    sdc.error.write(record, str(e))

if (state['first_batch'] == "true" and len(records) > 0):
  state['first_batch'] = "false"

image description


image description


image description


Cheers, Dash

edit flag offensive delete link more

Comments

Thanks, Dash! Let me try this out.

SB gravatar imageSB ( 2020-05-19 03:49:31 -0500 )edit

You're welcome!

iamontheinet gravatar imageiamontheinet ( 2020-05-19 07:48:14 -0500 )edit
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2020-05-14 16:20:26 -0500

Seen: 58 times

Last updated: May 14