Ask Your Question
1

Has anyone tested Postgres CDC origin from latest release v3.4?

asked 2018-08-09 11:43:35 -0500

sheraz Khan gravatar image

updated 2018-08-15 13:24:45 -0500

metadaddy gravatar image

Hi, I have been trying to use Postgres CDC origin with Postgres DB (ver: 9.6.9), all prerequisite has been configured as per the documentation (includes jdbc driver, WAL setup). I have been able to connect with same Postgres DB using JDBC consumer origin but with Postgres CDC origin I'm getting following error.

Preview Not Working:

2018-08-15 13:37:34,400 Postgres CDC JSON/PostGresCDCtestd36d9642-bb39-40d4-9cb4-f05142be87f5   WARN    Could not return a private ClassLoader for 'streamsets-datacollector-cdh_5_14-lib', active private ClassLoaders='2'     ClassLoaderStageLibraryTask     *admin@admin    0       preview-pool-1-thread-1
2018-08-15 13:37:34,401 Postgres CDC JSON/PostGresCDCtestd36d9642-bb39-40d4-9cb4-f05142be87f5   ERROR   Uncaught throwable from com.streamsets.datacollector.execution.preview.async.AsyncPreviewer$$Lambda$199/423610108@518e4746: java.lang.RuntimeException: java.lang.IllegalStateException: Object has already been returned to this pool or is invalid    SafeScheduledExecutorService    *admin@admin            preview-pool-1-thread-1
java.lang.RuntimeException: java.lang.IllegalStateException: Object has already been returned to this pool or is invalid
        at com.streamsets.datacollector.stagelibrary.ClassLoaderStageLibraryTask.releaseStageClassLoader(ClassLoaderStageLibraryTask.java:860)
        at com.streamsets.datacollector.runner.preview.PreviewStageLibraryTask.releaseStageClassLoader(PreviewStageLibraryTask.java:184)
        at com.streamsets.datacollector.creation.StageBean.releaseClassLoader(StageBean.java:95)
        at com.streamsets.datacollector.runner.StageRuntime.destroy(StageRuntime.java:336)
        at com.streamsets.datacollector.runner.StagePipe.destroy(StagePipe.java:357)
        at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.lambda$destroy$1(PreviewPipelineRunner.java:394)
        at com.streamsets.datacollector.runner.PipeRunner.executeBatch(PipeRunner.java:136)
        at com.streamsets.datacollector.runner.preview.PreviewPipelineRunner.destroy(PreviewPipelineRunner.java:394)
        at com.streamsets.datacollector.runner.Pipeline.destroy(Pipeline.java:430)
        at com.streamsets.datacollector.runner.preview.PreviewPipeline.destroy(PreviewPipeline.java:64)
        at com.streamsets.datacollector.execution.preview.sync.SyncPreviewer.start(SyncPreviewer.java:253)
        at com.streamsets.datacollector.execution.preview.async.AsyncPreviewer.lambda$start$0(AsyncPreviewer.java:95)
        at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
        at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
        at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
        at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.lambda$call$0(SafeScheduledExecutorService.java:226)
        at com.streamsets.datacollector.security.GroupsInScope.execute(GroupsInScope.java:33)
        at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.call(SafeScheduledExecutorService.java:222)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at com.streamsets.datacollector.metrics.MetricSafeScheduledExecutorService$MetricsTask.run(MetricSafeScheduledExecutorService.java:100)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Object has already been returned to this pool or is invalid
        at org.apache.commons.pool2.impl.GenericKeyedObjectPool.returnObject(GenericKeyedObjectPool.java:484)
        at com.streamsets.datacollector.stagelibrary.ClassLoaderStageLibraryTask.releaseStageClassLoader(ClassLoaderStageLibraryTask.java:854)
        ... 24 more
2018-08-15 13:37:34,405 Postgres CDC JSON/PostGresCDCtestd36d9642-bb39-40d4-9cb4-f05142be87f5   INFO    Destroy HdfsTargetConfigBean    *admin@admin    0       preview-pool-1-thread-2
2018-08-15 13:37:34,406 Postgres CDC JSON/PostGresCDCtestd36d9642-bb39-40d4-9cb4-f05142be87f5   INFO    HikariPool-33 - is closing down.        HikariPool      *admin@admin    0       preview-pool-1-thread-2
2018-08-15 13:37:34,409 Postgres CDC ...
(more)
edit retag flag offensive close merge delete

Comments

Can you edit your question to include the full stack trace from sdc.log?

metadaddy gravatar imagemetadaddy ( 2018-08-09 11:45:16 -0500 )edit

i just added a log image, i don't know how to upload a log file

sheraz Khan gravatar imagesheraz Khan ( 2018-08-09 13:02:50 -0500 )edit

Just copy the stack trace lines from sdc.log and paste them into your question.

metadaddy gravatar imagemetadaddy ( 2018-08-09 13:42:49 -0500 )edit

2 Answers

Sort by ยป oldest newest most voted
0

answered 2018-08-13 12:25:29 -0500

sheraz Khan gravatar image

updated 2018-08-15 13:24:23 -0500

metadaddy gravatar image

Thanks for your directions, here is a quick update on the progress Following are the three commands used to create a test_slot on Postgres side

  1. SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); (ran at Postgres manually)
  2. SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1'); (this triggers from SDC)
  3. SELECT 'stop' FROM pg_drop_replication_slot('test_slot'); (to stop the test_slot from Postgres)

I'm attaching the output in an image file, need your assistance on how to read these fields, I tried to used field flattener but the column name requires regex to remove MAP hierarchies, so I used a Field Renamer, is there is a standard way to read the CDC response to insert the rows at Impala using update/delete/insert action coming under "change.0.kind". It seems I have to parse the entire thing in a Jython component and create an Insert command for Impala. please advise.

C:\fakepath\SDC Postgres CDC response tree.jpg

edit flag offensive delete link more
0

answered 2018-08-09 14:21:02 -0500

metadaddy gravatar image

Looking in the code, it seems to be failing when it tries to execute:

SELECT * FROM pg_create_logical_replication_slot(?, ?)

Where the first parameter is the configured replication slot and the second is the output plugin name, wal2json. Check that the StreamSets-provided wal2json plugin is correctly installed.

Unfortunately, the details of the error are not logged (I filed SDC-9741 to fix this), but you might be able to get more clues by connecting to PostgreSQL via psql and executing:

SELECT * FROM pg_create_logical_replication_slot('<slot name>', 'wal2json');
edit flag offensive delete link more

Comments

need your advise on my answer, I will remove my response later as this is not the complete solution, due to space limitation I put my comments as answer.

sheraz Khan gravatar imagesheraz Khan ( 2018-08-13 12:26:52 -0500 )edit

Hi, I cannot run Preview while using Postgres CDC Origin, is this a bug? Just copied logs with actual post under Preview not working Tag

sheraz Khan gravatar imagesheraz Khan ( 2018-08-15 12:42:06 -0500 )edit
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2018-08-09 11:43:35 -0500

Seen: 72 times

Last updated: Aug 15