Ask Your Question
1

File Tail and Pipeline Finisher precondition error

asked 2019-03-01 08:03:52 -0500

Daniel gravatar image

updated 2019-03-04 10:49:14 -0500

metadaddy gravatar image

Hi,

I'd like to read from a file and stop my pipeline when every rows has been loaded.

I got an Error in Pipeline Finisher becaouse the precondition is wrong for the File Tail event. Could you send me an example what shoud I write in the precondition of the Pipeline Finisher if I'd like to stop my pipeline when all records is loaded from the source file.

I read a static file and the the file is not modified while the pipeline is running.

I'd like to check the END event at the end of the loading process - From documentation: event Event that generated the record. Uses one of the following values: START - File Tail started processing the specified file. END - File Tail completed processing the contents of the file.

The error message/stack trace and the precondition is the following:

COMMON_0001 - Stage precondition: CONTAINER_0051 - Unsatisfied precondition(s) '${record:eventType()=='END'}'
com.streamsets.pipeline.api.base.OnRecordErrorException: COMMON_0001 - Stage precondition: CONTAINER_0051 - Unsatisfied precondition(s) '${record:eventType()=='END'}'
    at com.streamsets.datacollector.runner.FilterRecordBatch.getRecords(FilterRecordBatch.java:96)
    at com.streamsets.pipeline.stage.executor.finishpipeline.PipelineFinisherExecutor.write(PipelineFinisherExecutor.java:41)
    at com.streamsets.pipeline.api.base.configurablestage.DExecutor.write(DExecutor.java:34)
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:290)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:235)
    at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:298)
    at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:244)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:824)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.lambda$executeRunner$3(ProductionPipelineRunner.java:869)
    at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:136)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:868)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:846)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:583)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:397)
    at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:529)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:109)
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
    at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:724)
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
    at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask ...
(more)
edit retag flag offensive close merge delete

Comments

Hi! Update your question with 1) the exact error message/stack trace from sdc.log file, and 2) your precondition. Out of curiosity, are you intending to read from a static file that's not being written to or a file that is being appended to while the pipeline is running?

iamontheinet gravatar imageiamontheinet ( 2019-03-01 15:56:06 -0500 )edit

Hi! thanks for your comment. I extended my question. I think the "Unsatisfied precondition(s) '${record:eventType()=='END'}'" condition could be the problem. Thanks.

Daniel gravatar imageDaniel ( 2019-03-01 20:44:25 -0500 )edit

I also tried ${record:value('/event') == 'END'} precondition and discarded errors in Pipeline Finisher (On Record Error: Discard), but the pipeline stayed in running state.

Daniel gravatar imageDaniel ( 2019-03-01 20:59:36 -0500 )edit

Thanks for your help. Directory origin and ${record:eventType() == 'finished-file'} precondition solved my issue. :-)

Daniel gravatar imageDaniel ( 2019-03-04 10:44:00 -0500 )edit

1 Answer

Sort by ยป oldest newest most voted
1

answered 2019-03-03 11:50:19 -0500

iamontheinet gravatar image

updated 2019-03-04 10:49:45 -0500

metadaddy gravatar image

If you are processing a static file, look into using Directory origin. Then, use

${record:eventType() == 'finished-file'}

precondition.

Cheers, Dash

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2019-03-01 08:03:52 -0500

Seen: 56 times

Last updated: Mar 04