cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Concurrently Loading Hundreds of CSVs to Neo4j

I'm a brand new user to neo4j. I have done a lot of search but didn't find much useful information.

Our graph model is very simple: two types of nodes (each node has only one property) and one type of relationship. But the volume of our data is huge – we have imported billions of data to neo4j using admin import.

Every month we will get billions of new data (with the same node/relationship type), and we want to write them to neo4j as well. To achieve this, we plan to run hundreds of worker machines, each of which will first generate a set of CSVs of nodes and relationships, and then call a custom API from neo4j server to load these CSVs concurrently.

But the problem is, same nodes are likely to appear multiple times within and across these CSVs. So during the concurrent data loads, especially when loading relationships, it seems there is a great chance for them to conflict and a deadlock to occur.

To verify this, I did some experiments using artificial data. load csv failed as expected.

ransientError: LockClient[4189] can't wait on resource RWLock[INDEX_ENTRY(2662559704856447762), hash=271851773] since => LockClient[4189] <-[:HELD_BY]- RWLock[NODE(96250), hash=26566105] <-[:WAITING_FOR]- LockClient[4190] <-[:HELD_BY]- RWLock[INDEX_ENTRY(2662559704856447762), hash=271851773]

However, when I do the same thing with apoc.load.csv and apoc.periodic.iterate, everything worked fine – no error was raised and all the nodes and relationships were succesfully imported.

Below is the steps of the experiment –

Code for apoc.load.csv. Each call has its own neo4j client and session.

def load_apoc_multi(path):
    driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "1234"))
    session = driver.session()
    stime = time()
    q = f"""
	    CALL apoc.periodic.iterate(
	    "CALL apoc.load.csv('file:///{path}' ) YIELD map AS row RETURN row",
	    "WITH row.col1 as p1, row.col2 as p2
	    MERGE (u:Label1 {{Property1: p1}})
	    MERGE (d:Label2 {{Property2: p2}})
	    MERGE (u)-[rel:LINKS]->(d)
	    RETURN count(*)",
	    {{batchSize: 500}})
    	"""
    session.run(q)
    etime = time()
    print(etime - stime)
    driver.close()
    session.close()

Data Generation

length = 500000
col1 = list(range(length)) * 7
col2 = list(range(length)) * 7
shuffle(col1)
shuffle(col2)

df = pd.DataFrame({'col1': col1, 'col2': col2})

Then I split the dataframe into 7 CSV files and called load_apoc_multi as below

with multiprocessing.Pool(processes=7) as pool:  
    pool.map(load_apoc_multi, ['largetest1.csv', 'largetest2.csv', 'largetest3.csv', 'largetest4.csv', 
                               'largetest5.csv', 'largetest6.csv', 'largetest7.csv'])

Unlike load csv, this time alll the nodes and relationships were successfully generated.

Questions:

  1. Why didn't I get any error in my second experiment with apoc? Does it have any mechanism to handle such scenarios?
  2. Does that mean it is safe to use apoc.load.csv to concurrently write a bunch of CSVs to neo4j even though the nodes in the CSVs are not disjointed?
  3. If the answer is no, please kindly advise what would be the best way / common practice to approach this problem – importing billions of new data from hundreds of worker nodes every certain period of time. I feel this should be something a lot of companies might have encountered before. Must we serialize all the CSV files first for data augmentation?

Thanks so much for reading!

0 REPLIES 0