Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
08-28-2018 07:05 PM
I'd like to ask a follow up question to the one posed on Stackoverflow here.
@William_Lyon notes in his answer that "We can batch multiple queries into one transaction for better performance... Typically we can batch ~20k database operations in a single transaction. "
For convenience, I have pasted the example code below:
tx = graph.begin()
for index, row in df.iterrows():
tx.evaluate('''
MATCH (a:Label1 {property:$label1))
MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2))
''', parameters = {'label1': row['label1'], 'label2': row['label2']})
tx.commit()
Well, what if the Pandas dataframe had much more than 20,000 rows? Suppose 10 million. I know that if we are using LOAD_CSV
directly from the cypher-shell, we would include PERIODIC COMMIT 20000
to make it commit every 20000 lines of the CSV.
What would be the equivalent of using PERIODIC COMMIT 20000
for importing from a large dataframe and using py2neo?
The py2neo docs mention an optional autocommit
argument to make each individual transaction automatically commit (almost the opposite of what I want). But I don't see anything about specifying PERIODIC COMMIT
.
The closest I can think of to a workaround is, within the iterrows
loop, is to just do a modulo operation on the row
variable. And keep the final commit() outside the loop. So the modified code would look something like this:
tx = graph.begin()
for index, row in df.iterrows():
tx.evaluate('''
MATCH (a:Label1 {property:$label1))
MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2))
''', parameters = {'label1': row['label1'], 'label2': row['label2']})
if row % 20000 == 0:
tx.commit()
tx.commit()
Would this be a viable workaround? Is there any other way?
Solved! Go to Solution.
08-31-2018 03:14 PM
Thanks. I ended up having to also begin()
a new transaction after the call to evaluate()
inside the loop. If not, the call to evaluate()
outside the loop raises a TransactionFinished
exception.
Full code:
statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
ON CREATE SET
iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
ON CREATE SET
nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
params_dict = {
'USAGE_INSTANCE': row['USAGE_INSTANCE'],
'USAGE_TYPE': row['USAGE_TYPE'],
'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE']
}
params.append(params_dict)
if index % 20000 == 0 and index > 0:
tx.evaluate(statement, parameters = {"parameters" : params})
tx = graph.begin(autocommit=True)
params = []
tx.evaluate(statement, parameters = {"parameters" : params})
08-29-2018 02:03 AM
Usually you would feed batches of data to your statement, and then inside using UNWIND
to turn that into rows.
The advantage over running each statement individually is that you only run one statement per batch instead of 20.000.
So your statement could look like:
statement = """
UNWIND $rows as row
MATCH (a:Label1 {property:row.label1))
MERGE (a)-[r:R_TYPE]->(b:Label2 {property:row.label2))
"""
tx = graph.begin(autocommit=True)
params = []
for index, row in df.iterrows():
params += {'label1': row['label1'], 'label2': row['label2']}
if row % 20000 == 0:
tx.evaluate(statement, parameters = params)
params = []
tx.evaluate(statement, parameters = params)
Please note that in your statement you merge the nodes b
relative to a
i.e. you will have duplicate nodes with the same property value. If you actually want to have unique nodes b
you need to use 2 separate MERGE
operations, one for the node one for the relationship.
Please also note that you cannot set labels via parameters in Cypher, so you'll have to do that bit in Python and then group by label-pairs. Or you can use apoc.create.node
which allows dynamic labels.
08-29-2018 11:23 AM
Thanks, to avoid the issue with duplicate nodes with the same property value, would this modified statement do the trick?
Also, I had a bug in my earlier proposed Python code. row % 20000
should be index % 20000
. I also renamed some variables to make it less confusing and more readable.
One thing I am still a little confused on is where does the $rows
parameter in the Cypher statement get set?
statement = """
UNWIND $rows as row
MATCH (a:Label1 {id: $label1id})
MATCH (b:Label2 {id: $label2id})
// assumes nodes a and b already exist, with uniqueness constraints
// just create a relationship of type R_TYPE between pre-existing node a and node b
MERGE (a)-[r:R_TYPE]->(b)
"""
tx = graph.begin(autocommit=True)
params = []
# column names of dataframe: label1id, label2id
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
params += {'label1id': row['label1id'], 'label2id': row['label2id']}
if index % 20000 == 0:
tx.evaluate(statement, parameters = params)
params = []
tx.evaluate(statement, parameters = params)
08-29-2018 12:03 PM
Sorry my bad, should have been $parameters
, please adapt
Fixed below, also you need to use row.label
b/c you pass in a list of dicts.
statement = """
UNWIND $parameters as row
MATCH (a:Label1 {id: row.label1id})
MATCH (b:Label2 {id: row.label2id})
// assumes nodes a and b already exist, with uniqueness constraints
// just create a relationship of type R_TYPE between pre-existing node a and node b
MERGE (a)-[r:R_TYPE]->(b)
"""
08-29-2018 09:28 PM
Something is still off with the passing of the parameters. I'm getting ValueErrors no matter what I try.
First tried this:
statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
ON CREATE SET
iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
ON CREATE SET
nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
params += {
'USAGE_INSTANCE': row['USAGE_INSTANCE'],
'USAGE_TYPE': row['USAGE_TYPE'],
'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE'],
}
if index % 20000 == 0:
tx.evaluate(statement, parameters = params)
params = []
tx.evaluate(statement, parameters = params)
Results in:
ValueError Traceback (most recent call last)
<ipython-input-37-28c0264c4f87> in <module>()
48 #params.append(params_dict)
49 if index % 20000 == 0:
---> 50 tx.evaluate(statement, parameters = params)
51 params = []
52 tx.evaluate(statement, parameters = params)
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in evaluate(self, cypher, parameters, **kwparameters)
889 :returns: single return value or :const:`None`
890 """
--> 891 return self.run(cypher, parameters, **kwparameters).evaluate(0)
892
893 def create(self, subgraph):
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in run(self, cypher, parameters, **kwparameters)
840 result = self.transaction.run(cypher, parameters, **kwparameters)
841 else:
--> 842 result = self.session.run(cypher, parameters, **kwparameters)
843 except CypherError as error:
844 raise GraphError.hydrate({"code": error.code, "message": error.message})
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/neo4j/v1/api.py in run(self, statement, parameters, **kwparameters)
325 self._connect()
326
--> 327 result = self.__run__(statement, dict(parameters or {}, **kwparameters))
328
329 if not self.has_transaction():
ValueError: dictionary update sequence element #0 has length 22; 2 is required
If I look at the params
object:
['NEXT_HIGHER_USAGE_INST',
'NEXT_HIGHER_USAGE_TYPE',
'USAGE_INSTANCE',
'USAGE_TYPE']
Then I tried to ensure we are building a list of dicts:
statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
ON CREATE SET
iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
ON CREATE SET
nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
params_dict = {
'USAGE_INSTANCE': row['USAGE_INSTANCE'],
'USAGE_TYPE': row['USAGE_TYPE'],
'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE'],
}
params.append(params_dict)
if index % 20000 == 0:
tx.evaluate(statement, parameters = params)
params = []
tx.evaluate(statement, parameters = params)
Error:
ValueError Traceback (most recent call last)
<ipython-input-25-86de9174551c> in <module>()
48 params.append(params_dict)
49 if index % 20000 == 0:
---> 50 tx.evaluate(statement, parameters = params)
51 params = []
52 tx.evaluate(statement, parameters = params)
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in evaluate(self, cypher, parameters, **kwparameters)
889 :returns: single return value or :const:`None`
890 """
--> 891 return self.run(cypher, parameters, **kwparameters).evaluate(0)
892
893 def create(self, subgraph):
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in run(self, cypher, parameters, **kwparameters)
840 result = self.transaction.run(cypher, parameters, **kwparameters)
841 else:
--> 842 result = self.session.run(cypher, parameters, **kwparameters)
843 except CypherError as error:
844 raise GraphError.hydrate({"code": error.code, "message": error.message})
/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/neo4j/v1/api.py in run(self, statement, parameters, **kwparameters)
325 self._connect()
326
--> 327 result = self.__run__(statement, dict(parameters or {}, **kwparameters))
328
329 if not self.has_transaction():
ValueError: dictionary update sequence element #0 has length 4; 2 is required
If I then look at the params
object:
# (dict values replaced with toy data values...)
[{'NEXT_HIGHER_USAGE_INST': 'inst9002',
'NEXT_HIGHER_USAGE_TYPE': 'prototype',
'USAGE_INSTANCE': 'inst7312',
'USAGE_TYPE': 'production'}]
From what I have found so far, this exception is raised "because you trying to update dict
object by using a wrong sequence ( list
or tuple
) structure." However, I'm not seeing where a dict is being updated here. It is just set, then appended to a list.
08-30-2018 05:36 AM
I would just create the dict from the data like you did initially.
Oh I might have missed something:
You need to pass in a dict for parameters, i.e.
parameters = {"parameters" : params}
08-31-2018 03:14 PM
Thanks. I ended up having to also begin()
a new transaction after the call to evaluate()
inside the loop. If not, the call to evaluate()
outside the loop raises a TransactionFinished
exception.
Full code:
statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
ON CREATE SET
iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
ON CREATE SET
nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
params_dict = {
'USAGE_INSTANCE': row['USAGE_INSTANCE'],
'USAGE_TYPE': row['USAGE_TYPE'],
'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE']
}
params.append(params_dict)
if index % 20000 == 0 and index > 0:
tx.evaluate(statement, parameters = {"parameters" : params})
tx = graph.begin(autocommit=True)
params = []
tx.evaluate(statement, parameters = {"parameters" : params})
04-05-2021 11:52 PM
Iterating through large pandas dataFrame objects is generally slow. Pandas iteration beats the whole purpose of using DataFrame. It is an anti-pattern and is something you should only do when you have exhausted every other option. It is better look for a List Comprehensions , vectorized solution or DataFrame.apply() method.
Pandas DataFrame loop using list comprehension example
result = [(x, y,z) for x, y,z in zip(df['column_1'], df['column_2'],df['column_3'])]
07-02-2021 03:41 PM
You can chunk it like this which worked well for me when I put that together with Clair:
def add_authors(rows, batch_size=10000):
# Adds author nodes to the Neo4j graph as a batch job.
query = '''
UNWIND $rows AS row
MERGE (:Author {name: row.author})
RETURN count(*) as total
'''
return insert_data(query, rows, batch_size)
def insert_data(query, rows, batch_size = 10000):
# Function to handle the updating the Neo4j database in batch mode.
total = 0
batch = 0
start = time.time()
result = None
while batch * batch_size < len(rows):
res = conn.query(query,
parameters= {
'rows': rows[batch*batch_sizebatch+1)*batch_size].to_dict('records')})
total += res[0]['total']
batch += 1
result = {"total":total,
"batches":batch,
"time":time.time()-start}
print(result)
return result
All the sessions of the conference are now available online