cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Another "speed up the load" question from a relatively inexperienced Neo4j user

I am trying to load up a fairly large set of data. My input file is fairly straightforward but big. The data is all cypher commands.

MERGE (:typeImportOnDemandDeclaration {nodeSeq:4,name:'importjava.io.*;',compileunit:'webgoat.combined.source',type:'typeImportOnDemandDeclaration'});

later in the program are the node connections

MATCH (a:ProgNode),(b:ProgNode) WITH a,b WHERE a.nodeSeq = 4 AND b.nodeSeq = 5 MERGE (a)-[r:Program{compileunit:'webgoat.combined.source', source:'webgoat.combined.source'}]->(b);

All of these are located in a single file coming in from multiple sources. When I wrote the original upload, I was fine with a few thousand nodes. But we just got a file with 100M and its a bit slow. I realize I was not doing it efficiently, so I needed to batch things up. That sounded easy. It has NOT been and the answers given all over the internet are creating more confusion.

To start, I cannot go back and rewrite for CSV for a variety of reasons. So unless someone can come up with a compelling CSV reason, thats out. It has to be some variant of the code below where the line variable is actually a complete cypher statement, as above. the "for line in FI:" loops over the 100m cypher lines. Label is not the same on each line. It varies.

This version used a single embedded string ( I know, clumsy) but none of my other variants had any better luck. The "payload" statement is the big one.

       **batch_statement = """
   UNWIND {batch} as row**
    MERGE (n:Label {row.id})**
    (ON CREATE) SET n += row.properties
   """
**
    payload = '{batch: ['
    maxcount = 4
    with graphDB_Driver.session() as graphDB_Session:
        start_time = time.time()    
        print("Starting Node load @ %s\n" % time.asctime())
        # Create nodes
        tx = graphDB_Session.begin_transaction()
        for line in FI:
            counter +=1
            if counter >= startrow:
                if (counter % maxcount) == 0:
                   print(counter)
                   payload = payload + payloadstring + "]" + batch_statement
# payload is the string I need to run. 

                   tx.run(payload)
                   tx.commit()
                   print("     line %s was reached" % counter)

                   payload = '{batch: ['
                   time.sleep(3)

                   
                firstword = line.split()[0]    
                if firstword == "MATCH" and matchflag == False:
                    print("  Created %s nodes\n" % counter)
                    print("  Beginning links @ %s\n" % str(time.asctime()))
                    matchflag = True
                elif firstword == "CREATE" and createflag == False:
                    print("  Beginning Node Creation\n")
                    createflag = True
                elif firstword == "//" and postflag == False:
                    print("  %s  @ %s\n" % (line[:-2], str(time.asctime())))
                    postflag = True
                else:
                    print("  %s  @ %s - unknown \n" % (line[:-2], str(time.asctime())))
                 
                if firstword != "//":
                   # break down the cypher into a key and a data 
                    splitstart = line.find("{")
                    splitstop = line.find("}")
                    indexstring = "{id:'"+line[7:splitstart-1].strip()+"',"
                    payloadstring = indexstring + " properties:"+line[splitstart:splitstop]+"}"
                    
                    payload = payload + payloadstring + ","

        FO.close()    

This seems basically easy to do but its beating me. 

Thanks
37 REPLIES 37

This bit of this query can be rewritten to get you an advantage. Don't do this:

MATCH (a:ProgNode),(b:ProgNode) WITH a,b WHERE a.nodeSeq = 4 AND b.nodeSeq = 5

Do this:

MATCH (a:ProgNode { nodeSeq: 4 })
WITH a
MATCH (b:ProgNode { nodeSeq: 5 })
(Merge other stuff here)

For big imports, make sure those fields are indexed, and that you have plenty of page cache configured in your database, and these changes alone should speed it up quite a bit.

Thank you - that is easy enough to fix so I appreciate the pointer. Any thoughts on the node creation itself. We just ran a test and we are about to bump 200M nodes. Clearly one at a time isn't going to cut it!

Thanks again, I'll have that fixed asap.

For the payload, use parameters submitted to the query. I'd make my cypher query something like:

UNWIND $batch as event
/* Do merge based on a single event */

And then I'd submit an array of objects as the batch parameter to the query. Dont' try to put all of your data into the cypher string

David, I'm sorry but you are going to have to be a lot more specific than that one line! lol.

The line "parameters submitted to the query" doesn't make any sense in this case. There are 120 Million lines with 86 attributes ! Each line is a unique combination. There are no duplicate nodes unless it gets rerun.

I have tried every version of the UNWIND and its not working. The programs that generate the cypher have all been distributed so either I post process (which is what this program is doing) or I call up all 200 companies and make them rewrite.

Can you look at it again, and see if there is a better example. Again, starting from Cypher should be easy, not harder !

Hello @bill.dickenson,

Can we see the CSV format? (some rows as example)
Do you have one CSV for nodes and one for relationships?

Regards,
Cobra

There is none. I would have to go back and recreate it or extract it from the cypher.

What is the data format in entry?

The data is generated by a bunch of different client programs, some I know of, some I do not. There are 200 clients, each has already done the preprocessing to get it into a cypher format.

You have the output from their programs in the sample. So if I have to write something to untranslate, I will.

They all follow this syntax? MERGE (:typeImportOnDemandDeclaration {nodeSeq:4,name:'importjava.io.*;',compileunit:'webgoat.combined.source',type:'typeImportOnDemandDeclaration'});

I assume you can't directly send requests to your Neo4j database.
The cleanest way would be to translate all these requests into a CSV.
If you are able to make it, I can propose several queries to load nodes and relationships from CSV.

Yes, although there are probably 86 more variables in that list. I cut it after type. So whatever technique we use, we can expand to the other.

"send the request to Neo4j" ? From where ? From each of their sites ? Have all 200 log on ? No.

Ok. If I must generate CSV, I will. It will have to be something not comma delimited since the comma is used all over the place.

I advice you to make a CSV for the nodes and one for the relationships. When you will have them, you will can adapt these queries:

  • To load nodes:
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///nodes.csv" AS row
MERGE (p:ProgNode{nodeSeq: row.nodeSeq})
SET p += row
  • To load relationships:
USING PERIODIC COMMIT 10000
LOAD CSV WITH HEADERS FROM "file:///relationships.csv" AS row
MATCH (a:ProgNode{nodeSeq: row.start})
MATCH (b:ProgNode{nodeSeq: row.end})
CALL apoc.create.relationship(a, row.RELATIONSHIP, {}, b) YIELD rel
RETURN rel

Moreover don't forget to use UNIQUE CONSTRAINTS on nodeSeq if they are unique of course, it will speed up a lot your query

I hope it will help you

Ok - not a good answer but if its CSV or nothing, I guess its CSV. I do consider the inability to bulk update using your own language to be a heck of a miss that is unexpected.

I'll repost once they finish. I had the canned code from up above but thank you for posting it here as well.

Be back.

Yeah sorry, but I would like to know the person who got the "GOOD IDEA" to send you Cypher requests from 200 clients, it should have been classic communication format like JSON or CSV.

The option to translate into CSV is the best way for you since the quantity of data you have, it should load everything in a few seconds or minutes depends of your database

Don't hesitate to ask if you need anything else.

Regards,
Cobra

Me. Also the person who WILL make the recommendation on Neo4j or your COMPETITION. lol. So if your implication is that cypher isn't strong enough, we agree. But I applaud your candor.

I agree that in hindsight, json would have been better. CSV is old school and very hard to control for realworld application. For what we needed, it would have been (and may likely still be) impossible. We did go with simple. That was my mistake. I won't make that mistake again with Neo4j.

Ok - so now we have this format.

Nodes - PSV (pipe separated, in this case a double pipe) with headers...

ProgNode||nodeSeq||name||compileunit||type||itd||szAEP||szAFPF||quViolations||quVioDensity||quChange||location||level||szlocs||eieo
compilationUnit||0||'DisplayIncomplete:importjava.net.*;importjava.io.*;importjava.nio.channels.*;importjava.util.Properties;publ'||'webgoat.combined.source'||'compilationUnit'||'data:Writes'||25||''||2||2.0||False||0||'0'||1||False
typeImportOnDemandDeclaration>>1||'importjava.net.*;'||'webgoat.combined.source'||'typeImportOnDemandDeclaration'||'data:Reads'||1||''||0||0.0||False||[16, 0, 16, 17]||'code'||2||True

and relations

'a'||'b'||'aunit'||'bunit'
0||1||'webgoat.combined.source'||'webgoat.combined.source'
1||2||'webgoat.combined.source'||'webgoat.combined.source'

This will be called from a python program so when you respond, can you fill out enough so it could be used that way ?

Thanks

To be honest I'm not working for Neo4j , but I have always find a way to do what I want to do, in some cases you still will have to do some Python treatment. Just don't forget that Cypher is not a programming language like Python, Cypher is lile SQL

The easy way will be to load directly the CSV file

Have a look here to configure correctly your database that will allow it to access your CSV files.
https://neo4j.com/docs/cypher-manual/current/clauses/load-csv/#query-load-csv-introduction

You can directly change the code I give you above, in your case, is ProgNode the common Label or for example compilationUnit will be a Label?

Good ! I was on the original DB2 product team. ( I am old)

Freelance by any chance ?

        if output == 'cypher':
            neostmt = "MERGE (:ProgNode:%s {nodeSeq:%s,name:'%s',compileunit:'%s',type:'%s',kdm:'%s',szAEP:%s,szAFPF:'%s',quViolations:%s,quVioDensity:%s,quChange:%s,location:%s,level:'%s',szlocs:%s,eieo:%s});\n"
            neoout = neostmt % (nodetype,inode,istring,COMPILEUNIT,nodetype,kdm,szaep,szAFPF,quviolations,quVioDensity,changed, location,level, szlocs, eieo)
        else:
            neostmt = "%s||%s||'%s'||'%s'||'%s'||'%s'||%s||'%s'||%s||%s||%s||%s||'%s'||%s||%s\n"
            neoout = neostmt % (nodetype,inode,istring,COMPILEUNIT,nodetype,kdm,szaep,szAFPF,quviolations,quVioDensity,changed, location,level, szlocs, eieo)

No, the node label should be nodetype and ProgNode, the inode is the unique identifier.

Thanks

I'm so young , and no, I'm working for a startup but we are opened to consulting

So you must create batches of data now:

BATCH = {'batch': []}


def reset_batch():
    """
    Function to reset the batch.
    """
    BATCH["batch"] = []


def merge_relation(args):
    """
    Function to create relations from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row MATCH (a:ProgNode{inode:row.a}) MATCH (b:ProgNode{inode:row.b}) CALL apoc.merge.relationship(a, 'PROGRAM', {}, apoc.map.removeKeys(properties(row), ['a', 'b']), b) YIELD rel RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())


def merge_node(args):
    """
    Function to create nodes from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {inode:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'inode'])) YIELD node RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())


nodes = pd.read_csv(filepath_or_buffer='nodes.csv', header=[0], sep='||', encoding='utf-8')
relations = pd.read_csv(filepath_or_buffer='relations.csv', header=[0], sep='||', encoding='utf-8')

nodes.apply(lambda h: merge_node(h), axis=1)
reset_batch()
relations.apply(lambda h: merge_relation(h), axis=1)

Don't forget to add the UNIQUE CONSTRAINTS:

CREATE CONSTRAINT constraint_inode ON (p:ProgNode) ASSERT p.inode IS UNIQUE

You also need to install APOC plugin on your database.

Documentation:

I'm not sure if the code is working correclty but the idea is here I hope it will help you

Regards,
Cobra

drop me a note at Bill.dickenson@veriprism.llc and lets talk non disclosure and rates.

Yes, I get the point on this one. I'll go ahead and make a run at it and see what happens.

And I would never have gotten there from the directions. Thank you

Thank you

Need a plan b. CSV is probably not going to work. I have tried :;,|~` and all of the doubles, and all of the combnations (e.g. || or |;) and its not making it very far. JSON would be my next choice as Pandas doesn't work well under python with the double delimiter and no single will do.

I'll rewrite in json but some help would be good.

Hello @bill.dickenson

You can change my above code a bit and it will work with JSON, you just don't need to convert to dict each element in of your json in merge functions since it's already a list of dict

All changed over and it does look better but having two odd problems.

Here are the two json files

{
	"0": {
		"EIEO": true,
		"FILECOUNT": 1,
		"KDM": "data:Reads",
		"changed": false,
		"ctx": "1482759651",
		"level": "code",
		"location": [
			22540,
			53,
			22540,
			53
		],
		"node": "54914",
		"quvioDensity": 1.0,
		"quviolations": 2,
		"szAFP": "",
		"szaep": 8,
		"szlocs": 2,
		"text": "UserService",
		"type": "typeName"
	},
	"1": {
		"EIEO": true,
		"FILECOUNT": 1,
		"KDM": "data:Reads",
		"changed": false,
		"ctx": "1482759651",
		"level": "code",
		"location": [
			22540,
			53,
			22540,
			53
		],
		"node": "54914",
		"quvioDensity": 1.0,
		"quviolations": 2,
		"szAFP": "",
		"szaep": 8,
		"szlocs": 2,
		"text": "UserService",
		"type": "typeName"
	},

and the relationships

{
	"0": {
		"compile": "webgoat.combined.source",
		"from": "0",
		"to": "54690"
	},
	"1": {
		"compile": "webgoat.combined.source",
		"from": "1",
		"to": "2"
	},
	"100": {
		"compile": "webgoat.combined.source",
		"from": "100",
		"to": "101"
	},

Code - I dropped some housekeeping

MERGE (:ProgNode:typeImportOnDemandDeclaration {nodeSeq:4,name:'importjavdef merge_relation(args):
    """
    Function to create relations from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row MATCH (a:ProgNode{inode:row.a}) MATCH (b:ProgNode{inode:row.b}) CALL apoc.merge.relationship(a, 'PROGRAM', {}, apoc.map.removeKeys(properties(row), ['a', 'b']), b) YIELD rel RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())


def merge_node(args):
    """
    Function to create nodes from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {inode:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'inode'])) YIELD node RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())



def main(fname):
    print("Starting load of %s - nodes \n" % filenode)
    nodes = pd.read_json(filenode, encoding='utf-8')
    print("Starting load of %s - connections \n" % filematch)
    relations = pd.read_json(filematch, encoding='utf-8')
    print("Files loaded %s - connections \n" % filematch)
    nodes.apply(lambda h: merge_node(h), axis=1)
    reset_batch()
    relations.apply(lambda h: merge_relation(h), axis=1)    

Two issues.

Nodes load correctly, relationships do not and the error is a bit obscure.

 File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\json\_json.py", line 1089, in _parse_no_numpy
    loads(json, precise_float=self.precise_float), dtype=None
ValueError: Expected object or value

call is the same as before.

And I assume the names have to change but I hesitate.

Did you check in Neo4j browser if the nodes were loaded correctly?

Can you print the content of the batch of relations to check what is in it?

Nothing loaded into Neo4j at all. The whole json fole is about 54K nodes in this example. I confirmed that a few places. Now when it loads into json, it does look like the whole file loaded, but pivoted fields first. (APL strikes back - lol)

    nodes = pd.read_json(filenode, encoding='utf-8')
    print(nodes)

So this is working....

Now we run the apoc after we pivot the nodes file with nodes.apply using the lambda. I added the rest of the section.

    start_time = time.time()    
    print("Starting load of %s - nodes \n" % filenode)
    nodes = pd.read_json(filenode, encoding='utf-8')
    print(nodes)
    print("Starting load of %s - connections \n" % filematch)
#    relations = pd.read_json(filematch, encoding='utf-8')
    print("Files loaded %s - connections \n" % filematch)
    nodes.apply(lambda h: merge_node(h), axis=1)
    reset_batch()
#    relations.apply(lambda h: merge_relation(h), axis=1)    

but it doesnt look like it pivoted. No error, just silent. I think I am actually seeing each field (15) not the row.

2X_5_5035ff42b4eb1bfdd1634a554ca3ee13d21c0b33.png

I also commented out the relation load as that wasn't loading into Pandas. Thats funny as it is simple compared to the other.

I do feel guilty about asking, but if you do have a rate and are up to the consulting (or even codementor) I am willing to pay to solve this.

At any rate, thank you for the help so far.

Hello, my boss will contact you

Can you try this?

def merge_node(args):
    """
    Function to create nodes from a batch.
    """
    if len(BATCH['batch']) > 1000:
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {inode:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'inode'])) YIELD node RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())
nodes = pd.read_json(filenode, encoding='utf-8')
nodes = nodes.T
nodes['inode'] = nodes.index
nodes.apply(lambda h: merge_node(h), axis=1)
reset_batch()

I have some good news, and some bad but we are close.

Its all working as far as code. I can't see whats being sent, but it is working. Minor changes (inode is no node) and I added some last record logic. But as soon as it ends, it send back a pair of error messages and nothing is showing up in Neo4j. However, I think its the same issue.

I did make some minor changes to the code:

def merge_node(args):
    global INNODE, NODECOUNT
    """
    Function to create nodes from a batch.
    """
    INNODE += 1
    if (INNODE % 10000) == 0:
        print("...Sent %s of %s for processig" % (INNODE, NODECOUNT))
    if (len(BATCH['batch']) > 1000) or (INNODE == NODECOUNT):
        if INNODE == NODECOUNT:
            print("...Final Record (%s) added and transmitted" % INNODE)
            BATCH['batch'].append(args.to_dict())            
        with graphDB_Driver.session() as ses:
            ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {node:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'node'])) YIELD node RETURN 1", batch=BATCH["batch"])
        reset_batch()
    BATCH['batch'].append(args.to_dict())

This is the content in batch

Load Neo4j file webgoat
Sections : ['Neo4J', 'SourceMachine']
GraphDatabase.driver(bolt://dev.Veriprism.net:7687
webgoat.combined.source
files: webgoat.combined.source.neo-n webgoat.combined.sourceneo-c
Starting load of webgoat.combined.source.neo-n - nodes

{'batch': [{'EIEO': True, 'FILECOUNT': 1, 'KDM': 'data:Reads', 'changed': False, 'ctx': '1033320531', 'level': 'code', 'location': [4835, 30, 4835, 30], 'node': 10001, 'quvioDensity': 0.5, 'quviolations': 1, 'szAFP': '', 'szaep': 17, 'szlocs': 2, 'text': 'user', 'type': 'typeName'},

This does look reasonable. 

Here is the code that made the connection

uri=configur.get("Neo4J","host")
userName        = configur.get("Neo4J","id")
password        = configur.get("Neo4J","pw")
print("GraphDatabase.driver("+uri)
graphDB_Driver  = GraphDatabase.driver(uri, auth=(userName, password))    

Can you try the code on a local database (build one in Neo4j Desktop) and not a remote one?
Which version of Neo4j are you using? (I advice you to use the latest one 4.1)

Loaded up local, added APOC

Ran the code. This is what I found

Starting load of webgoat.combined.source.neo-n - nodes 

Traceback (most recent call last):
  File "F:/ClientSide/current/testload1.py", line 125, in <module>
    main(fname)
  File "F:/ClientSide/current/testload1.py", line 98, in main
    nodes.apply(lambda h: merge_node(h), axis=1)
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py", line 6878, in apply
    return op.get_result()
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\apply.py", line 186, in get_result
    return self.apply_standard()
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\apply.py", line 296, in apply_standard
    values, self.f, axis=self.axis, dummy=dummy, labels=labels
  File "pandas\_libs\reduction.pyx", line 620, in pandas._libs.reduction.compute_reduction
  File "pandas\_libs\reduction.pyx", line 128, in pandas._libs.reduction.Reducer.get_result
  File "F:/ClientSide/current/testload1.py", line 98, in <lambda>
    nodes.apply(lambda h: merge_node(h), axis=1)
  File "F:/ClientSide/current/testload1.py", line 54, in merge_node
    ses.run("UNWIND $batch AS row CALL apoc.merge.node(['ProgNode', row.nodetype], {node:row.inode}, apoc.map.removeKeys(properties(row), ['nodetype', 'node'])) YIELD node RETURN 1", batch=BATCH["batch"])
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\neo4j\__init__.py", line 499, in run
    self._connection.fetch()
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\neobolt\direct.py", line 422, in fetch
    return self._fetch()
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\neobolt\direct.py", line 464, in _fetch
    response.on_failure(summary_metadata or {})
  File "C:\Users\Bill Dickenson\AppData\Local\Programs\Python\Python37\lib\site-packages\neobolt\direct.py", line 759, in on_failure
    raise CypherError.hydrate(**metadata)
neobolt.exceptions.ClientError: Failed to invoke procedure `apoc.merge.node`: Caused by: java.lang.NullPointerException
>>> 

Did you upgrade the Python Neo4j driver too?

pip install --upgrade neo4j

I did. So Neo4j 4.1, new APOC, New driver. Same issue. Rebooted. Restarted - same issue.

thanks

To be honest I don't know from where is coming this error.

Can you print the content of the batch before the send to the database?

This was back a few but is the contents of batch. Only the first line but he rest same format.

Do you have an example in the batch where a record has one or severals null values? I think the problem is coming from here:)

When you have your DataFrame, try to replace all nan and null values by an empty string for example or whatever.

Embarrassed to say, I found it.

My naming conventions were done in a hurry and I had introduced some inconsistencies. Someone noted that two referenced variables were not there and when fixed, it worked fine. So all is working now mechanically. The relations are not creating correctly, but since the nodes are, I think I can puzzle it out.

Thank you again, this was way more complicated that it should have been and you solved it.

No problem, I'm happy to hear this

Regards,
Cobra

I do need just a tad more help.

So the content of batch is:

[
 {"child": "54690", "compile": "webgoat.combined.source", "parent": "0", "tree": "runs", "from": 0},
 {"child": "2", "compile": "webgoat.combined.source", "parent": "1", "tree": "calls", "from": 1},
 {"child": "101", "compile": "webgoat.combined.source", "parent": "100", "tree": "runs", "from": 100},
 {"child": "1001", "compile": "webgoat.combined.source", "parent": "1000", "tree": "runs", "from": 1000},
 {"child": "10001", "compile": "webgoat.combined.source", "parent": "10000", "tree": "runs", "from": 10000},
 {"child": "10004", "compile": "webgoat.combined.source", "parent": "10003", "tree": "runs", "from": 10003},
 {"child": "10009", "compile": "webgoat.combined.source", "parent": "10004", "tree": "runs", "from": 10004},
 {"child": "10007", "compile": "webgoat.combined.source", "parent": "10005", "tree": "runs", "from": 10005},
 {"child": "10008", "compile": "webgoat.combined.source", "parent": "10007", "tree": "runs", "from": 10007},
 {"child": "1005", "compile": "webgoat.combined.source", "parent": "1001", "tree": "runs", "from": 1001},
 {"child": "1003", "compile": "webgoat.combined.source", "parent": "1002", "tree": "runs", "from": 1002}
 ]

and of course the nodes ( which are created already) are:

[{"EIEO": false, "FILECOUNT": 1, "KDM": "code:StorableUnit", "changed": false, "ctx": "1793546528", "inode": "5050", "level": "code", "location": [2607, 18, 2607, 18], "quvioDensity": 1.0, "quviolations": 2, "szAFP": "", "szaep": 10, "szlocs": 2, "text": "final", "type": "fieldModifier", "node": 5050},
 {"EIEO": false, "FILECOUNT": 1, "KDM": "Action:Addresses", "changed": false, "ctx": "259837957", "inode": "50500", "level": "code", "location": [20399, 39, 20399, 39], "quvioDensity": 0.0, "quviolations": 0, "szAFP": "", "szaep": 28, "szlocs": 2, "text": "e", "type": "variableDeclaratorId", "node": 50500},
 {"EIEO": true, "FILECOUNT": 1, "KDM": "data:Writes", "changed": false, "ctx": "1571545022", "inode": "50501", "level": "code", "location": [20399, 42, 20401, 8], "quvioDensity": 0.0, "quviolations": 0, "szAFP": "", "szaep": 27, "szlocs": 4, "text": "{log.error(\"Error occurred while writing the logfile\",e);}", "type": "block", "node": 50501}]

and I need to create a relationship between parent and child, but ONLY if they share the same compileunit. Its possible that two different compiles could have a node 0 ( in fact, thats a certainty) and I don't want to create it out of school.

Now based on your example, this is my code

ses.run("UNWIND $batch AS row MATCH (a:ProgNode{inode:row.parent}) MATCH (b:ProgNode{inode:row.child}) CALL apoc.merge.relationship(a, row.tree, {compileunit:row.compile}, apoc.map.removeKeys(properties(row), ['parent', 'child']),b) YIELD rel RETURN 1", batch=BATCH["batch"])

I am not getting an error (good) but I am also not getting a relationship (bad)

in Cypher I would have written this as

MATCH (a:ProgNode { inode:parent,compileunit:compile }) WITH a MATCH (b:ProgNode { inode: child, compileunit:compile}) MERGE (a)-[r:tree{compileunit:'%s', source:'%s'}]->(b);\n"

The r:tree adds a wrinkle also.

This is close to the last thing I need to do. Can you help ?

The first batch you gave is the one for the relationships?
Are you sure inode is a string and not an integer?

Duh - thank you - that was it. Made the nodes integers and it worked. Thank you again