Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
03-04-2021 11:11 AM
I'm a brand new user to neo4j. I have done a lot of search but didn't find much useful information.
Our graph model is very simple: two types of nodes (each node has only one property) and one type of relationship. But the volume of our data is huge – we have imported billions of data to neo4j using admin import
.
Every month we will get billions of new data (with the same node/relationship type), and we want to write them to neo4j as well. To achieve this, we plan to run hundreds of worker machines, each of which will first generate a set of CSVs of nodes and relationships, and then call a custom API from neo4j server to load these CSVs concurrently.
But the problem is, same nodes are likely to appear multiple times within and across these CSVs. So during the concurrent data loads, especially when loading relationships, it seems there is a great chance for them to conflict and a deadlock to occur.
To verify this, I did some experiments using artificial data. load csv
failed as expected.
ransientError: LockClient[4189] can't wait on resource RWLock[INDEX_ENTRY(2662559704856447762), hash=271851773] since => LockClient[4189] <-[:HELD_BY]- RWLock[NODE(96250), hash=26566105] <-[:WAITING_FOR]- LockClient[4190] <-[:HELD_BY]- RWLock[INDEX_ENTRY(2662559704856447762), hash=271851773]
However, when I do the same thing with apoc.load.csv
and apoc.periodic.iterate
, everything worked fine – no error was raised and all the nodes and relationships were succesfully imported.
Below is the steps of the experiment –
Code for apoc.load.csv
. Each call has its own neo4j client and session.
def load_apoc_multi(path):
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "1234"))
session = driver.session()
stime = time()
q = f"""
CALL apoc.periodic.iterate(
"CALL apoc.load.csv('file:///{path}' ) YIELD map AS row RETURN row",
"WITH row.col1 as p1, row.col2 as p2
MERGE (u:Label1 {{Property1: p1}})
MERGE (d:Label2 {{Property2: p2}})
MERGE (u)-[rel:LINKS]->(d)
RETURN count(*)",
{{batchSize: 500}})
"""
session.run(q)
etime = time()
print(etime - stime)
driver.close()
session.close()
Data Generation
length = 500000
col1 = list(range(length)) * 7
col2 = list(range(length)) * 7
shuffle(col1)
shuffle(col2)
df = pd.DataFrame({'col1': col1, 'col2': col2})
Then I split the dataframe into 7 CSV files and called load_apoc_multi
as below
with multiprocessing.Pool(processes=7) as pool:
pool.map(load_apoc_multi, ['largetest1.csv', 'largetest2.csv', 'largetest3.csv', 'largetest4.csv',
'largetest5.csv', 'largetest6.csv', 'largetest7.csv'])
Unlike load csv
, this time alll the nodes and relationships were successfully generated.
Questions:
apoc
? Does it have any mechanism to handle such scenarios?apoc.load.csv
to concurrently write a bunch of CSVs to neo4j even though the nodes in the CSVs are not disjointed?Thanks so much for reading!
All the sessions of the conference are now available online