Ask Your Question
0

Kafka to MongoDB destination [closed]

asked 2018-11-30 15:54:45 -0600

srinath_222 gravatar image

updated 2018-12-03 12:13:20 -0600

I am trying to load the data from Kafka to MongoDB destination. The data in kafka is in below format : { "xid": 1948, "nextlsn": "0/286FB420", "timestamp": "2018-11-29 16:41:21.217215-05", "change": [ { "kind": "insert", "schema": "public", "table": "wal_test3", "columnnames": ["col1", "description"], "columntypes": ["integer", "character varying"], "columnvalues": [5, "wal_test5"] } ] } I am able to read data from Kafka Consumer if data format is Text (but throwing an error if I set JSON format). I am unable to read this in JSON format. I am using JSON generator initially to convert to json and able to see in data preview as: Record1-Output Record1 : STRING {"xid":1948,"nextlsn":"0/286FB420","timestamp":"2018-11-29 16:41:21.217215-05","change":[{"kind":"insert","schema":"public","table":"wal_test3","columnnames":["col1","description"],"columntypes":["integer","character varying"],"columnvalues":[5,"wal_test5"]}]}

I need to parse the data and load in to have MongoDB destination with database name and Collection name (did not do any further configurations). I have Kafka source, JSON Generator and MongoDB destination in my pipeline.

When tried to start the pipeline, getting the below error: Pipeline Status: RUNNING_ERROR: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.

Please suggest me how to parse the incoming data and load in to MongODB destination.

edit retag flag offensive reopen merge delete

Closed for the following reason the question is answered, right answer was accepted by srinath_222
close date 2018-12-06 13:49:40.685290

Comments

What exception does it throw when you set Data Format to JSON on Kafka Consumer origin?

iamontheinet gravatar imageiamontheinet ( 2018-11-30 16:48:24 -0600 )edit

Below exception if I set Data Format as JSON for Kafka Consumer: KAFKA_37 - Cannot parse record from message 'postgres_wal::0::1770': com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')

srinath_222 gravatar imagesrinath_222 ( 2018-12-02 13:40:41 -0600 )edit

Is this your example message { "timestamp": "2018-11-29 16:41:21.217215-05", "change": [ { "kind": "insert", "table": "wal_test3", } ] }? If so, there's an extra comma in the end, but if I remove it, it works for me in SDC 3.5. What exactly does you message look like and what's your SDC version?

iamontheinet gravatar imageiamontheinet ( 2018-12-03 09:21:14 -0600 )edit

1 Answer

Sort by ยป oldest newest most voted
0

answered 2018-12-03 12:23:09 -0600

srinath_222 gravatar image

Corrected the question, there is no extra comma. The message exactly looks like above from Kafka consumer and I am using SDC 3.6.0. I am able to read from Kafka Consumer if data format is text (message is spread accross 15 lines in kafka consumer) but getting error when format is JSON: Cannot parse record from message 'postgres_wal::0::1962': com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')

edit flag offensive delete link more

Comments

The updated Kafka message works fine for me. Take a look -- https://pasteboard.co/HQ512TD.png and https://pasteboard.co/HQ51yK5.png. Which Kafka stage library are you using?

iamontheinet gravatar imageiamontheinet ( 2018-12-03 16:34:44 -0600 )edit

Thanks for all the help. The Issue is resolved after I change the data format of incoming data into Kafka.

srinath_222 gravatar imagesrinath_222 ( 2018-12-06 13:49:13 -0600 )edit

Question Tools

1 follower

Stats

Asked: 2018-11-30 15:54:45 -0600

Seen: 29 times

Last updated: Dec 03