Ask Your Question

JSON schema validation in StreamSets

asked 2019-05-07 23:54:27 -0600

rickymart gravatar image

updated 2019-05-08 23:20:12 -0600

metadaddy gravatar image

Hi Team,

I am looking for a way to validate JSON schema in the streamsets pipeline. I have a pipeline that intake multiple JSON records of different structure and i would like to validate each and every JSON with their respective schema.

For example:

Record 1:

{"employee":"Rick", "email":"", "department":"audit"}

Record 1 schema:

{ "$schema": "", "type": "object", "properties": { "employee": { "type": "string" }, "email": { "type": "string" }, "department": { "type": "string" } }, "required": [ "employee", "email", "department" ] }

Record 2:

{"employee":"Rick", "address":"Times square", "salary":10000, "department":"sales"}

Record 2 schema:

{ "$schema": "", "type": "object", "properties": { "employee": { "type": "string" }, "address": { "type": "string" }, "salary": { "type": "integer" }, "department": { "type": "string" } }, "required": [ "employee", "address", "salary", "department" ] }

Above are the two example records. I may have different departments and each department will have their own json structure. Is there a possibility i can store the schema in the cache of the pipeline and do a lookup based on the department to get their respective schema and do the validation? If not what are the other ways to validate json in this scenario

edit retag flag offensive close merge delete

1 Answer

Sort by » oldest newest most voted

answered 2019-05-08 23:38:07 -0600

metadaddy gravatar image

You can do this with the Jython evaluator and the jsonschema Python module.

Here is my pipeline; you can download it (for StreamSets Data Collector 3.8.0 or above) here.

image description

The Dev Raw Data origin is configured to read its input as Text (not JSON!)

image description

image description

The first JSON Parser processor parses the text into the /json field.

image description

The Static Lookup processor is configured with the schemas for the various departments

image description

image description

The second JSON Parser processor parses the schema

image description

The Jython evaluator runs this script, which passes valid records on unchanged, but sends invalid ones to the error stream:

# Append jsonschema location to system path
import sys
# Customize for your deployment

from jsonschema import validate

for record in records:
    # If no exception is raised by validate(), the instance is valid.
    validate(instance=record.value['json'], schema=record.value['schema'])

    # Write record to processor output

  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Finally, the destination simply writes the valid records to disk.

image description

Here is a preview with valid and invalid data:

image description image description image description image description

edit flag offensive delete link more


What if the source NRT source like kinesis or kafka instead of Dev Raw data origin? can we try to achieve the same, i tried with one of the NRT source and it threw json parser exception

rickymart gravatar imagerickymart ( 2019-05-12 23:34:27 -0600 )edit

The type of origin doesn’t make any difference, only the structure of the data. It sounds like your data is not valid JSON. Can you edit your question and add a sample of the data?

metadaddy gravatar imagemetadaddy ( 2019-05-12 23:49:26 -0600 )edit

For now we are reading the record from kinesis . It should be adding information regarding shard with the actual records . I guess it is the place where the json parsing is failing .

rickymart gravatar imagerickymart ( 2019-05-13 02:14:04 -0600 )edit
Login/Signup to Answer

Question Tools

1 follower


Asked: 2019-05-07 23:54:27 -0600

Seen: 778 times

Last updated: May 12 '19