Ask Your Question

Trouble transforming US Census API data into Avro

asked 2019-09-03 14:56:21 -0500

avdsa gravatar image

updated 2019-09-17 08:23:48 -0500


I have a pipeline that ingests JSON formatted data from the US Census API. Here is the API call I am making for reference:,POP,DENSITY&for=place:*&in=state:39&in=county:*&key=4dc4a831c4ab825b649451a82890fc68bc7fe976

I am using an HTML origin configured to ingest JSON array of objects : image description

My goal is to use Schema generator to create an Avro formatted dataset to be ingested into a Kafka Producer as seen below:

image description

I have ingested JSON data before, but the results were returned in a LIST-MAP format, so there was no issue with my pipeline configuration. Now my data is returning in LIST format and I am unsure on what steps are needed in order to proceed.

image description

I am not sure if it is helpful, but some language from the Census website states that their JSON format is non-traditional: The Census uses a nonstandard version of JSON that is streamlined:

  • Data are represented in a two-dimensional array
  • Square brackets [ ] hold arrays
  • Values are separated by a , (comma).

Any help is very much appreciated. Thank you.

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted

answered 2019-09-03 17:29:02 -0500

metadaddy gravatar image

updated 2019-09-16 15:15:18 -0500

The problem here is that their JSON representation carries the field names as a list in the first record, with the field values as lists in subsequent records - this is like a mashup of JSON and CSV. The only way I can think of to handle this is to write an evaluator script (Groovy, JavaScript, or Jython) to read in that first record without emitting an output record, then apply the field names to the remaining records.

Here's a Jython script to do it. The trick is to get the keys from the initial record before the loop starts. Since there may be more than one batch of data, we ensure that we only set the keys once. Python's zip and dict make the rest easy:

# First record has keys. Make sure we do this exactly once per pipeline run, 
# and NOT once per batch!
if len(records) > 0 and 'keys' not in state:
  state['keys'] = records[0].value
  # Skip that first record
  records = iter(records)

# Now process the remaining records
for record in records:
    # Just make a dict using the keys and values...
    record.value = dict(zip(state['keys'], record.value))

    # and write the resulting record to output

  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Here's a few lines of output in JSON, just as a test:

{"GEONAME":"Aberdeen village, Ohio","POP":"1608","DENSITY":"1190.8916986","state":"39","place":"00142"}
{"GEONAME":"Ada village, Ohio","POP":"5557","DENSITY":"2664.7151088","state":"39","place":"00198"}
{"GEONAME":"Adamsville village, Ohio","POP":"121","DENSITY":"2276.0608908","state":"39","place":"00380"}
{"GEONAME":"Addyston village, Ohio","POP":"943","DENSITY":"1106.9358171","state":"39","place":"00436"}
{"GEONAME":"Adelphi village, Ohio","POP":"374","DENSITY":"1365.5921955","state":"39","place":"00450"}
edit flag offensive delete link more


Thanks for the direction metadaddy. Would you happen to have any templates related to your solution? I am new to Jython, and spent most of the day attempting to script out a solution, but can't seem to pull the values from the first record and map them as field names on the remaining records.

avdsa gravatar imageavdsa ( 2019-09-04 14:04:41 -0500 )edit

Update from my end, so far I have an almost working Jython Script for this issue. It works fine when I explicitly call what index I am mapping the field names. However, when I try to initiate the process through a nested for loop to go through every index, I keep getting this error...

avdsa gravatar imageavdsa ( 2019-09-09 13:59:26 -0500 )edit

SCRIPTING_04 - Script sent record to error: write(): 1st arg can't be coerced to com.streamsets.pipeline.stage.processor.scripting.ScriptRecord

avdsa gravatar imageavdsa ( 2019-09-09 14:00:16 -0500 )edit

And the script I have is here: for record in records: try: keys = record.value[0] index = 1 for values in record.value: mapped = dict(zip(keys,record.value[index])) index = index + 1 output.write(mapped) except Exception as e: error.write(record, str(e))

avdsa gravatar imageavdsa ( 2019-09-09 14:00:32 -0500 )edit

Please edit your question rather than posting code as comments. It's quite difficult to read.

metadaddy gravatar imagemetadaddy ( 2019-09-09 14:06:55 -0500 )edit
Login/Signup to Answer

Question Tools

1 follower


Asked: 2019-09-03 14:56:21 -0500

Seen: 299 times

Last updated: Sep 17 '19