Ask Your Question
1

Unique Record ID or Index from Processor Output

asked 2019-07-12 13:39:50 -0600

DataAnalyst1029 gravatar image

I have searched through the Streamsets documentation in search of this answer, but I have had no luck.

I have an example processor that outputs 100 records with similar data (in my case, the records have no unique data or fields to filter by), but I only need the first 50 records. Would it be possible to write a conditional statement to split the first 50 records to one output stream, sending the rest to another (trash) stream? Or utilize another processor to perform this action?

Another workaround that would work for me would be the ability for my processors to read records in batches - say 50 records at a time.

To summarize: When a processor (or origin) outputs records, does each record have a unique ID or row number associated with it that I can use to add more logic to my processing?

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
1

answered 2019-07-12 13:53:55 -0600

metadaddy gravatar image

Each record does have a unique id - you can access it via the EL ${record:id()} - but I'm not sure it helps you here - it does uniquely identify each record, but it's not a sequence number.

One way to solve this would be to write a script for one of the evaluators to only pass on the first 50 records of each batch. This is quite straightforward - here it is in Groovy:

// Don't run off the end of the list if there are fewer than 50 records!
count = Math.min(50, records.length)

for (i = 0; i < count; i++) {
  try {
    // Write a record to the processor output
    output.write(records[i])
  } catch (e) {
    // Write a record to the error pipeline
    log.error(e.toString(), e)
    error.write(record, e.toString())
  }
}
edit flag offensive delete link more

Comments

1

I will try this approach. You also mentioned what I was afraid of: the 'id' each record has is not a sequential number.

DataAnalyst1029 gravatar imageDataAnalyst1029 ( 2019-07-12 14:11:42 -0600 )edit
1

This worked for me, the Groovy Evaluator Processor actually allowed me much more flexibility over restricting records from continuing.

DataAnalyst1029 gravatar imageDataAnalyst1029 ( 2019-07-12 15:06:35 -0600 )edit
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2019-07-12 13:39:50 -0600

Seen: 200 times

Last updated: Jul 12