cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

How to PERIODIC COMMIT when importing data from large Pandas Dataframe?

apanza
Node Clone

I'd like to ask a follow up question to the one posed on Stackoverflow here.

@William_Lyon notes in his answer that "We can batch multiple queries into one transaction for better performance... Typically we can batch ~20k database operations in a single transaction. "

For convenience, I have pasted the example code below:

tx = graph.begin()
for index, row in df.iterrows():
    tx.evaluate('''
      MATCH (a:Label1 {property:$label1))
      MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2))
    ''', parameters = {'label1': row['label1'], 'label2': row['label2']})
tx.commit()

Well, what if the Pandas dataframe had much more than 20,000 rows? Suppose 10 million. I know that if we are using LOAD_CSV directly from the cypher-shell, we would include PERIODIC COMMIT 20000 to make it commit every 20000 lines of the CSV.

What would be the equivalent of using PERIODIC COMMIT 20000 for importing from a large dataframe and using py2neo?

The py2neo docs mention an optional autocommit argument to make each individual transaction automatically commit (almost the opposite of what I want). But I don't see anything about specifying PERIODIC COMMIT.

The closest I can think of to a workaround is, within the iterrows loop, is to just do a modulo operation on the row variable. And keep the final commit() outside the loop. So the modified code would look something like this:

tx = graph.begin()
for index, row in df.iterrows():
    tx.evaluate('''
      MATCH (a:Label1 {property:$label1))
      MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2))
    ''', parameters = {'label1': row['label1'], 'label2': row['label2']})
    if row % 20000 == 0:
        tx.commit()
tx.commit()

Would this be a viable workaround? Is there any other way?

1 ACCEPTED SOLUTION

apanza
Node Clone

Thanks. I ended up having to also begin() a new transaction after the call to evaluate() inside the loop. If not, the call to evaluate() outside the loop raises a TransactionFinished exception.

Full code:

statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
  ON CREATE SET
    iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
  ON CREATE SET
    nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
    params_dict = {
        'USAGE_INSTANCE': row['USAGE_INSTANCE'], 
        'USAGE_TYPE': row['USAGE_TYPE'],
        'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
        'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE']
    }
    params.append(params_dict)
    if index % 20000 == 0 and index > 0:
        tx.evaluate(statement, parameters = {"parameters" : params})
        tx = graph.begin(autocommit=True)
        params = []
tx.evaluate(statement, parameters = {"parameters" : params})

View solution in original post

8 REPLIES 8

Usually you would feed batches of data to your statement, and then inside using UNWIND to turn that into rows.
The advantage over running each statement individually is that you only run one statement per batch instead of 20.000.

So your statement could look like:

statement = """
          UNWIND $rows as row
          MATCH (a:Label1 {property:row.label1))
          MERGE (a)-[r:R_TYPE]->(b:Label2 {property:row.label2))
"""
tx = graph.begin(autocommit=True)
params = []
for index, row in df.iterrows():
    params += {'label1': row['label1'], 'label2': row['label2']}
    if row % 20000 == 0:
        tx.evaluate(statement, parameters = params)
        params = []
tx.evaluate(statement, parameters = params)

see also: https://medium.com/neo4j/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-c...

Please note that in your statement you merge the nodes b relative to a i.e. you will have duplicate nodes with the same property value. If you actually want to have unique nodes b you need to use 2 separate MERGE operations, one for the node one for the relationship.

Please also note that you cannot set labels via parameters in Cypher, so you'll have to do that bit in Python and then group by label-pairs. Or you can use apoc.create.node which allows dynamic labels.

Thanks, to avoid the issue with duplicate nodes with the same property value, would this modified statement do the trick?

Also, I had a bug in my earlier proposed Python code. row % 20000 should be index % 20000. I also renamed some variables to make it less confusing and more readable.

One thing I am still a little confused on is where does the $rows parameter in the Cypher statement get set?

statement = """
          UNWIND $rows as row
          MATCH (a:Label1 {id: $label1id})
          MATCH (b:Label2 {id: $label2id})
          // assumes nodes a and b already exist, with uniqueness constraints
          // just create a relationship of type R_TYPE between pre-existing node a and node b
          MERGE (a)-[r:R_TYPE]->(b)
"""
tx = graph.begin(autocommit=True)
params = []
# column names of dataframe: label1id, label2id
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
    params += {'label1id': row['label1id'], 'label2id': row['label2id']}
    if index % 20000 == 0:
        tx.evaluate(statement, parameters = params)
        params = []
tx.evaluate(statement, parameters = params)

Sorry my bad, should have been $parameters, please adapt

Fixed below, also you need to use row.label b/c you pass in a list of dicts.

statement = """
          UNWIND $parameters as row
          MATCH (a:Label1 {id: row.label1id})
          MATCH (b:Label2 {id: row.label2id})
          // assumes nodes a and b already exist, with uniqueness constraints
          // just create a relationship of type R_TYPE between pre-existing node a and node b
          MERGE (a)-[r:R_TYPE]->(b)
"""

Something is still off with the passing of the parameters. I'm getting ValueErrors no matter what I try.

First tried this:

statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
  ON CREATE SET
    iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
  ON CREATE SET
    nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
    params += {
        'USAGE_INSTANCE': row['USAGE_INSTANCE'], 
        'USAGE_TYPE': row['USAGE_TYPE'],
        'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
        'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE'],
    }
    if index % 20000 == 0:
        tx.evaluate(statement, parameters = params)
        params = []
tx.evaluate(statement, parameters = params)

Results in:

ValueError                                Traceback (most recent call last)
<ipython-input-37-28c0264c4f87> in <module>()
     48     #params.append(params_dict)
     49     if index % 20000 == 0:
---> 50         tx.evaluate(statement, parameters = params)
     51         params = []
     52 tx.evaluate(statement, parameters = params)

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in evaluate(self, cypher, parameters, **kwparameters)
    889         :returns: single return value or :const:`None`
    890         """
--> 891         return self.run(cypher, parameters, **kwparameters).evaluate(0)
    892 
    893     def create(self, subgraph):

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in run(self, cypher, parameters, **kwparameters)
    840                 result = self.transaction.run(cypher, parameters, **kwparameters)
    841             else:
--> 842                 result = self.session.run(cypher, parameters, **kwparameters)
    843         except CypherError as error:
    844             raise GraphError.hydrate({"code": error.code, "message": error.message})

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/neo4j/v1/api.py in run(self, statement, parameters, **kwparameters)
    325             self._connect()
    326 
--> 327         result = self.__run__(statement, dict(parameters or {}, **kwparameters))
    328 
    329         if not self.has_transaction():

ValueError: dictionary update sequence element #0 has length 22; 2 is required

If I look at the params object:

['NEXT_HIGHER_USAGE_INST',
 'NEXT_HIGHER_USAGE_TYPE',
 'USAGE_INSTANCE',
 'USAGE_TYPE']

Then I tried to ensure we are building a list of dicts:

statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
  ON CREATE SET
    iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
  ON CREATE SET
    nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
    params_dict = {
        'USAGE_INSTANCE': row['USAGE_INSTANCE'], 
        'USAGE_TYPE': row['USAGE_TYPE'],
        'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
        'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE'],
    }
    params.append(params_dict)
    if index % 20000 == 0:
        tx.evaluate(statement, parameters = params)
        params = []
tx.evaluate(statement, parameters = params)

Error:

ValueError                                Traceback (most recent call last)
<ipython-input-25-86de9174551c> in <module>()
     48     params.append(params_dict)
     49     if index % 20000 == 0:
---> 50         tx.evaluate(statement, parameters = params)
     51         params = []
     52 tx.evaluate(statement, parameters = params)

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in evaluate(self, cypher, parameters, **kwparameters)
    889         :returns: single return value or :const:`None`
    890         """
--> 891         return self.run(cypher, parameters, **kwparameters).evaluate(0)
    892 
    893     def create(self, subgraph):

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/py2neo/database.py in run(self, cypher, parameters, **kwparameters)
    840                 result = self.transaction.run(cypher, parameters, **kwparameters)
    841             else:
--> 842                 result = self.session.run(cypher, parameters, **kwparameters)
    843         except CypherError as error:
    844             raise GraphError.hydrate({"code": error.code, "message": error.message})

/app/local/anaconda3/envs/py35/lib/python3.5/site-packages/neo4j/v1/api.py in run(self, statement, parameters, **kwparameters)
    325             self._connect()
    326 
--> 327         result = self.__run__(statement, dict(parameters or {}, **kwparameters))
    328 
    329         if not self.has_transaction():

ValueError: dictionary update sequence element #0 has length 4; 2 is required

If I then look at the params object:

# (dict values replaced with toy data values...)
[{'NEXT_HIGHER_USAGE_INST': 'inst9002',
  'NEXT_HIGHER_USAGE_TYPE': 'prototype',
  'USAGE_INSTANCE': 'inst7312',
  'USAGE_TYPE': 'production'}]

From what I have found so far, this exception is raised "because you trying to update dict object by using a wrong sequence ( list or tuple ) structure." However, I'm not seeing where a dict is being updated here. It is just set, then appended to a list.

I would just create the dict from the data like you did initially.

Oh I might have missed something:

You need to pass in a dict for parameters, i.e.

parameters = {"parameters" : params}

apanza
Node Clone

Thanks. I ended up having to also begin() a new transaction after the call to evaluate() inside the loop. If not, the call to evaluate() outside the loop raises a TransactionFinished exception.

Full code:

statement = """
UNWIND $parameters as row
MERGE (iui:ItemUsageInstance {instance_id: trim(row.USAGE_INSTANCE)})
  ON CREATE SET
    iui.usage_type = trim(row.USAGE_TYPE)
MERGE (nhiui:ItemUsageInstance {instance_id: trim(row.NEXT_HIGHER_USAGE_INST)})
  ON CREATE SET
    nhiui.usage_type = trim(row.NEXT_HIGHER_USAGE_TYPE)
"""
tx = graph.begin(autocommit=True)
params = []
# dataframe is indexed with numerical indexes
for index, row in df.iterrows():
    params_dict = {
        'USAGE_INSTANCE': row['USAGE_INSTANCE'], 
        'USAGE_TYPE': row['USAGE_TYPE'],
        'NEXT_HIGHER_USAGE_INST': row['NEXT_HIGHER_USAGE_INST'],
        'NEXT_HIGHER_USAGE_TYPE': row['NEXT_HIGHER_USAGE_TYPE']
    }
    params.append(params_dict)
    if index % 20000 == 0 and index > 0:
        tx.evaluate(statement, parameters = {"parameters" : params})
        tx = graph.begin(autocommit=True)
        params = []
tx.evaluate(statement, parameters = {"parameters" : params})

Iterating through large pandas dataFrame objects is generally slow. Pandas iteration beats the whole purpose of using DataFrame. It is an anti-pattern and is something you should only do when you have exhausted every other option. It is better look for a List Comprehensions , vectorized solution or DataFrame.apply() method.

Pandas DataFrame loop using list comprehension example

result = [(x, y,z) for x, y,z in zip(df['column_1'], df['column_2'],df['column_3'])]

You can chunk it like this which worked well for me when I put that together with Clair:

def add_authors(rows, batch_size=10000):
    # Adds author nodes to the Neo4j graph as a batch job.
    query = '''
            UNWIND $rows AS row
            MERGE (:Author {name: row.author})
            RETURN count(*) as total
            '''
    return insert_data(query, rows, batch_size)


def insert_data(query, rows, batch_size = 10000):
    # Function to handle the updating the Neo4j database in batch mode.
    
    total = 0
    batch = 0
    start = time.time()
    result = None
    
    while batch * batch_size < len(rows):

        res = conn.query(query, 
                         parameters= {
                         'rows': rows[batch*batch_sizebatch+1)*batch_size].to_dict('records')})
        total += res[0]['total']
        batch += 1
        result = {"total":total, 
                  "batches":batch, 
                  "time":time.time()-start}
        print(result)
        
    return result