File Tail and Pipeline Finisher precondition error
Hi,
I'd like to read from a file and stop my pipeline when every rows has been loaded.
I got an Error in Pipeline Finisher becaouse the precondition is wrong for the File Tail event. Could you send me an example what shoud I write in the precondition of the Pipeline Finisher if I'd like to stop my pipeline when all records is loaded from the source file.
I read a static file and the the file is not modified while the pipeline is running.
I'd like to check the END event at the end of the loading process - From documentation: event Event that generated the record. Uses one of the following values: START - File Tail started processing the specified file. END - File Tail completed processing the contents of the file.
The error message/stack trace and the precondition is the following:
COMMON_0001 - Stage precondition: CONTAINER_0051 - Unsatisfied precondition(s) '${record:eventType()=='END'}'
com.streamsets.pipeline.api.base.OnRecordErrorException: COMMON_0001 - Stage precondition: CONTAINER_0051 - Unsatisfied precondition(s) '${record:eventType()=='END'}'
at com.streamsets.datacollector.runner.FilterRecordBatch.getRecords(FilterRecordBatch.java:96)
at com.streamsets.pipeline.stage.executor.finishpipeline.PipelineFinisherExecutor.write(PipelineFinisherExecutor.java:41)
at com.streamsets.pipeline.api.base.configurablestage.DExecutor.write(DExecutor.java:34)
at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(StageRuntime.java:290)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:235)
at com.streamsets.datacollector.runner.StageRuntime.execute(StageRuntime.java:298)
at com.streamsets.datacollector.runner.StagePipe.process(StagePipe.java:244)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.processPipe(ProductionPipelineRunner.java:824)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.lambda$executeRunner$3(ProductionPipelineRunner.java:869)
at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:136)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(ProductionPipelineRunner.java:868)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(ProductionPipelineRunner.java:846)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(ProductionPipelineRunner.java:583)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.run(ProductionPipelineRunner.java:397)
at com.streamsets.datacollector.runner.Pipeline.run(Pipeline.java:529)
at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(ProductionPipeline.java:109)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.run(ProductionPipelineRunnable.java:75)
at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(StandaloneRunner.java:724)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(AsyncRunner.java:151)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask ...
Hi! Update your question with 1) the exact error message/stack trace from sdc.log file, and 2) your precondition. Out of curiosity, are you intending to read from a static file that's not being written to or a file that is being appended to while the pipeline is running?
Hi! thanks for your comment. I extended my question. I think the "Unsatisfied precondition(s) '${record:eventType()=='END'}'" condition could be the problem. Thanks.
I also tried ${record:value('/event') == 'END'} precondition and discarded errors in Pipeline Finisher (On Record Error: Discard), but the pipeline stayed in running state.
Thanks for your help. Directory origin and ${record:eventType() == 'finished-file'} precondition solved my issue. :-)