Ask Your Question
0

How to pass entire batch as a list-map parameter? Neo4j

asked 2019-12-27 15:33:47 -0600

Hi

I saw a very nice talk on StreamSets & Neo4j JDBC by @metadaddy

https://streamsets.com/blog/visualizi...

I'd like to be able to pass the entire contents of a batch (eg 1000 records) into a single Neo4j transaction for fast data loading. An efficient way to do this is to pass in a list of maps and then call UNWIND and let the Neo4j server create the records, rather than having a separate transaction for each record. Essentially a bulk load.

I can wildcard the record maps (after serializing to JSON) in my pipeline and load record by record, setting the node properties from the map record all at once:

WITH apoc.convert.fromJsonMap('${record:value('/')}') as map CREATE (n:Movie) SET n+= map

How do I reference and pass the entire batch list-map as a param? i.e. something like

WITH apoc.convert.fromJsonList('${batch:record:value('/')}') as listmap UNWIND listmap AS map CREATE (n:Movie) SET n+= map

I don't see any record aggregator processors and it seems SS already has metadata for the batch list_map...I'm sure I'm missing something really basic.

Thanks! Michael

edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
0

answered 2019-12-28 18:30:34 -0600

The secret to this is a Jython processor, inserted before the JSON serialization step.

Here's the Jython script for batch aggregation

batch_list = []
attributes = {}
sourceId = ''

try:  
  for record in sdc.records:
    batch_list.append(record.value)
    sourceId = record.sourceId
    attributes = record.attributes

except Exception as e:
  sdc.error.write(record, str(e))  

newRecord = sdc.createRecord(sourceId)
newRecord.attributes.update(attributes)
newRecord.value = batch_list

sdc.output.write(newRecord)

This will aggregate all records into a new record as a list of maps and update the new record header with the header values from the last record to be aggregated (so you can at least see the offsets and source).

Serialize this map with a JSON processor setting both the "Field to Serialize" and "Target Field" to /

For the Neo4j batch load use the JDBC Query processor with this as a query

WITH apoc.convert.fromJsonList('${record:value('/')}') AS maps
UNWIND maps AS map
MERGE (n:Movie {primary_key: map.primary_key})
SET n+=map

and set enable parallel queries to true (checked), auto commit to false (unchecked) and batch commit to true (checked). This is a very efficient way to process a large number of records in a single Bolt transaction.

I set the CSV source 5000 records per batch, buffer limit to 5120 and the number of threads to be the same as processors on my machine (8). I also set the pipeline max runners to 8. With this I was able to load 64K records in 30 secs. You can increase the batch size production.maxBatchSize=10000 and parser limit parser.limit=5335040 beyond the defaults of 1000 records and 1024 KB in the sdc.properties file (you'll need to restart).

Some notes:

As a Neo4j convention I like using snake_case for properties keys, and it was easy to accomplish this in StreamSets using the rename processor

[
    {
        "fromFieldExpression": "/movieId",
        "toFieldExpression": "/movie_id"
    },
    {
        "fromFieldExpression": "/personId",
        "toFieldExpression": "/person_id"
    },
    {
        "fromFieldExpression": "/birthYear",
        "toFieldExpression": "/birth_year"
    },
    {
        "fromFieldExpression": "/deathYear",
        "toFieldExpression": "/death_year"
    },
    {
        "fromFieldExpression": "/releaseYear",
        "toFieldExpression": "/release_year"
    }
]

The movie file I used was from a Neo4j exercise that had some duplicate rows in it and was denormalized across movie actor and role. I used an expression processor to create a primary key from movie_id, actor_id, and base64 encoded character:

[
    {
        "fieldToSet": "/primary_key",
        "expression": "${str:concat(str:concat(str:concat(str:concat(record:value('/movie_id'),'-'),record:value('/person_id')),'-'),base64:encodeString(record:value('/characters'),false,'UTF-8'))}"
    }
]

In addition a number of the fields contained simple quotes which needed to be replaced in order for the Neo4j query apoc.convert.fromJsonList function to work property (it needs the list to be wrapped in simple quotes). I added an expression processor to replace these with pipes, which is easy to clean up in Neo4j (I wasn't successful in replacing the simple quotes with escapes - any hints would be appreciated).

[
    {
        "fieldToSet": "/tagline",
        "expression": "${str:replaceAll(record:value('/tagline'), str:unescapeJava(\"'\"),'|')}"
    },
    {
        "fieldToSet": "/title",
        "expression": "${str:replaceAll(record:value('/title'), str:unescapeJava(\"'\"),'|')}"
    },
    {
        "fieldToSet": "/name",
        "expression": "${str:replaceAll(record:value('/name'), str:unescapeJava(\"'\"),'|')}"
    },
    {
        "fieldToSet": "/characters",
        "expression": "${str:replaceAll(record ...
(more)
edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2019-12-26 10:27:40 -0600

Seen: 12 times

Last updated: Dec 28 '19