Ask Your Question

Revision history [back]

The problem here is that their JSON representation carries the field names as a list in the first record, with the field values as lists in subsequent records - this is like a mashup of JSON and CSV. The only way I can think of to handle this is to write an evaluator script (Groovy, JavaScript, or Jython) to read in that first record without emitting an output record, then apply the field names to the remaining records.

The problem here is that their JSON representation carries the field names as a list in the first record, with the field values as lists in subsequent records - this is like a mashup of JSON and CSV. The only way I can think of to handle this is to write an evaluator script (Groovy, JavaScript, or Jython) to read in that first record without emitting an output record, then apply the field names to the remaining records.

Here's a Jython script to do it. The trick is to get the keys from the initial record before the loop starts. Since there may be more than one batch of data, we ensure that we only set the keys once. Python's zip and dict make the rest easy:

# First record has keys. Make sure we do this exactly once per pipeline run, 
# and NOT once per batch!
if len(records) > 0 and 'keys' not in state:
  state['keys'] = records[0].value
  # Skip that first record
  records = iter(records)
  next(records)

# Now process the remaining records
for record in records:
  try:
    # Now just make a dict using the keys and values
    record.value = dict(zip(state['keys'], record.value))

    output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

Here's a few lines of output in JSON, just as a test:

{"GEONAME":"Aberdeen village, Ohio","POP":"1608","DENSITY":"1190.8916986","state":"39","place":"00142"}
{"GEONAME":"Ada village, Ohio","POP":"5557","DENSITY":"2664.7151088","state":"39","place":"00198"}
{"GEONAME":"Adamsville village, Ohio","POP":"121","DENSITY":"2276.0608908","state":"39","place":"00380"}
{"GEONAME":"Addyston village, Ohio","POP":"943","DENSITY":"1106.9358171","state":"39","place":"00436"}
{"GEONAME":"Adelphi village, Ohio","POP":"374","DENSITY":"1365.5921955","state":"39","place":"00450"}

The problem here is that their JSON representation carries the field names as a list in the first record, with the field values as lists in subsequent records - this is like a mashup of JSON and CSV. The only way I can think of to handle this is to write an evaluator script (Groovy, JavaScript, or Jython) to read in that first record without emitting an output record, then apply the field names to the remaining records.

Here's a Jython script to do it. The trick is to get the keys from the initial record before the loop starts. Since there may be more than one batch of data, we ensure that we only set the keys once. Python's zip and dict make the rest easy:

# First record has keys. Make sure we do this exactly once per pipeline run, 
# and NOT once per batch!
if len(records) > 0 and 'keys' not in state:
  state['keys'] = records[0].value
  # Skip that first record
  records = iter(records)
  next(records)

# Now process the remaining records
for record in records:
  try:
    # Now just Just make a dict using the keys and values
values...
    record.value = dict(zip(state['keys'], record.value))

    # and write the resulting record to output
    output.write(record)
   except Exception as e:
    # Send record to error
    error.write(record, str(e))

Here's a few lines of output in JSON, just as a test:

{"GEONAME":"Aberdeen village, Ohio","POP":"1608","DENSITY":"1190.8916986","state":"39","place":"00142"}
{"GEONAME":"Ada village, Ohio","POP":"5557","DENSITY":"2664.7151088","state":"39","place":"00198"}
{"GEONAME":"Adamsville village, Ohio","POP":"121","DENSITY":"2276.0608908","state":"39","place":"00380"}
{"GEONAME":"Addyston village, Ohio","POP":"943","DENSITY":"1106.9358171","state":"39","place":"00436"}
{"GEONAME":"Adelphi village, Ohio","POP":"374","DENSITY":"1365.5921955","state":"39","place":"00450"}