Ask Your Question
1

Execute MongoDB query in StreamSets

asked 2019-02-12 10:21:13 -0500

srinath_222 gravatar image

Hi,

I have a requirement to execute a query in MongoDB (retrieving data from join of multiple collections) and fetch the result as part of processors in my pipeline.

We have MongoDB Lookup processor but it does not have an option to execute query on MongoDB database like JDBC Lookup. I have an Origin as Relational and I want to fetch the data from Mongo as a lookup.

Please suggest an approach how to fetch the data from MongoDB.

edit retag flag offensive close merge delete

Comments

@iamontheinet any suggestions on this ?

srinath_222 gravatar imagesrinath_222 ( 2019-02-19 09:50:49 -0500 )edit

1 Answer

Sort by ยป oldest newest most voted
1

answered 2019-02-19 14:33:40 -0500

iamontheinet gravatar image

Hi!

You can use one of the scripting processors to connect to MongoDB and perform (lookup) queries. Here's how in Jython Evaluator:

1) Install pymongo via pip install pymongo

2) In Jython Evaluator Init Script section add:

import sys
sys.path.append('/anaconda3/lib/python3.6/site-packages')
import pymongo
from pymongo import MongoClient
state['client'] = MongoClient('localhost', 27017)
state['db'] = state['client'].dashDB
state['collection'] = state['db'].inventory

Note: Replace sys.path with the path where you installed pymongo in step 1 as well as client connection, db and collection details as per your environment. FYI, if db and collection are not static, move them down to Script section.

3) In Jython Evaluator Script section add:

client = state['client']
db = state['db']
collection = state['collection']
for record in records:
  try:
    lookup_item = collection.find_one({"item": record.value['item']})
    log.info("======== lookup_item: {}",lookup_item)
    # do something with looked up item
    # Write record to processor output
    output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Note: Update code as it pertains to your logic.

3) In Jython Evaluator Destroy Script section add:

client = state['client']
client.close()

Hope this helps!

Cheers, Dash

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

2 followers

Stats

Asked: 2019-02-12 10:21:13 -0500

Seen: 541 times

Last updated: Feb 19