Error while sending error records to google cloud storage

asked 2020-11-10 14:58:28 -0500

mateusleao gravatar image

Hello guys, first post here, been using streamsets for a while and I love it, but this problem has been bothering me for about two weeks now.

I did a test pipeline that should throw some error records to GCS: - The pipeline throws half of the records (non-error) to a GCS (without a problem: meaning that the GCS bucket is ok, auth is ok, configuration is ok, etc.) - Now, when it starts throwing error records to GCS, it gives me a java null pointer error, and I have no idea why this is happening. It's weird because it can send the good records to GCS, but no the errored ones? And even weirder is the fact that even if the error comes inn the pipeline, it GETS to SUCCESSFULLY record the errors there in GCS, before crashing the job in streamsets...

My main question would be, why is the streamsets job crashing, if when the good data is sent to GCS I have no problem, and since even the error records are correctly recorded in GCS, before crashing the streamsets job...

This is the pipeline test:

And here is the error log:

2020-11-10 20:43:08,359 USERS_STATES GCS DUP TEST 2/USERSSTATESGCSDUPTEST280e71dc1-3ee1-400d-9919-60f73394059f  WARN    Error while running: java.lang.NullPointerException ProductionPipeline  *mateus     ProductionPipelineRunnable-USERSSTATESGCSDUPTEST280e71dc1-3ee1-400d-9919-60f73394059f-USERS_STATES GCS DUP TEST 2
    at com.streamsets.datacollector.runner.StageContext.toEvent(
    at com.streamsets.pipeline.lib.event.EventCreator$EventBuilder.createAndSend(
    at com.streamsets.pipeline.stage.cloudstorage.destination.GoogleCloudStorageTarget.lambda$write$5(
    at java.lang.Iterable.forEach(
    at com.streamsets.pipeline.stage.cloudstorage.destination.GoogleCloudStorageTarget.write(
    at com.streamsets.pipeline.api.base.configurablestage.DTarget.write(
    at com.streamsets.datacollector.runner.StageRuntime.lambda$execute$2(
    at com.streamsets.datacollector.runner.StageRuntime.execute(
    at com.streamsets.datacollector.runner.StageRuntime.execute(
    at com.streamsets.datacollector.runner.production.BadRecordsHandler.handle(
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.executeRunner(
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runSourceLessBatch(
    at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner.runPollSource(
    at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(
    at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$
    at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(
    at ...
edit retag flag offensive close merge delete