Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
03-31-2021 07:25 AM
Hi all, i want to import a large dataset which inlcude json files, (using the apoc library) to neo4j graph db. How i will improve this operation? I used concurrent futures ThreadPoolExecutor as a start for multithreading. Although it seems to be working as starting the threads (each thread is executing one query), only one query at a time is being executed. Below is my python and cypher script. Is there anything i could improve on this script or to change in the db settings so i could use more power of my resources ?
Note that i create a single session in my main() using the supported driver.
def cve_insertion(self):
print("\n Inserting Files to Database...")
files = files_to_insert()
with concurrent.futures.ThreadPoolExecutor() as ex:
results = [ex.submit(self.neo4jquery, file) for file in files]
for f in concurrent.futures.as_completed(results):
print(f.result())
def neo4jquery(self, file):
query = """call apoc.load.json($filepath) yield value
unwind value.CVE_Items as item
unwind item.cve.description.description_data as descData
unwind item.cve.problemtype.problemtype_data as problemtype_data
unwind problemtype_data.description as CWE
unwind item.cve.references.reference_data as reference_data
merge (i:GeneralInfo {Data_Type:value.CVE_data_type, Data_Format:value.CVE_data_format,
Data_Version:value.CVE_data_version, No_CVEs:value.CVE_data_numberOfCVEs,
Timestamp:value.CVE_data_timestamp})
merge (a:CVE {id:item.cve.CVE_data_meta.ID, Assigner:item.cve.CVE_data_meta.ASSIGNER,
Description:descData.value, Description_Lang:descData.lang, Published_Date:item.publishedDate,
Last_Modified_Date:item.lastModifiedDate})
create (p:CVSS_3 {Version:item.impact.baseMetricV3.cvssV3.version, Vector_String:item.impact.baseMetricV3.cvssV3.vectorString,
Attack_Vector:item.impact.baseMetricV3.cvssV3.attackVector, Attack_Complexity:item.impact.baseMetricV3.cvssV3.attackComplexity,
Privileges_Required:item.impact.baseMetricV3.cvssV3.privilegesRequired, User_Interaction:item.impact.baseMetricV3.cvssV3.userInteraction,
Scope:item.impact.baseMetricV3.cvssV3.scope, Confidentiality_Impact:item.impact.baseMetricV3.cvssV3.confidentialityImpact,
Integrity_Impact:item.impact.baseMetricV3.cvssV3.integrityImpact, Availability_Impact:item.impact.baseMetricV3.cvssV3.availabilityImpact,
Base_Score:item.impact.baseMetricV3.cvssV3.baseScore, Base_Severity:item.impact.baseMetricV3.cvssV3.baseSeverity})
create (l:CVSS_2 {Version:item.impact.baseMetricV2.cvssV2.version, Vector_String:item.impact.baseMetricV2.cvssV2.vectorString,
Access_Vector:item.impact.baseMetricV2.cvssV2.accessVector, Access_Complexity:item.impact.baseMetricV2.cvssV2.accessComplexity,
Authentication:item.impact.baseMetricV2.cvssV2.authentication,
Confidentiality_Impact:item.impact.baseMetricV2.cvssV2.confidentialityImpact,
Integrity_Impact:item.impact.baseMetricV2.cvssV2.integrityImpact,
Availability_Impact:item.impact.baseMetricV2.cvssV2.availabilityImpact,
Base_Score:item.impact.baseMetricV2.cvssV2.baseScore})
merge (c:CWE {id:CWE.value, Language:CWE.lang})
merge (r:Reference_Data {url:reference_data.url, name:reference_data.name, refSource: reference_data.refsource})
create (a)-[:CVSS3_Impact{Exploitability_Score:item.cve.impact.baseMetricV3.exploitabilityScore,
Impact_Score:item.cve.impact.baseMetricV3.impactScore}]->(p)
create (a)-[:CVSS2_Impact{Exploitability_Score:item.cve.impact.baseMetricV2.exploitabilityScore,
Severity:item.cve.impact.baseMetricV2.severity, Impact_Score:item.cve.impact.baseMetricV2.impactScore,
acInsufInfo:item.cve.impact.baseMetricV2.acInsufInfo,
Obtain_All_Privileges:item.cve.impact.baseMetricV2.obtainAllPrivileges,
Obtain_User_Privileges:item.cve.impact.baseMetricV2.obtainUserPrivileges,
Obtain_Other_Privileges:item.cve.impact.baseMetricV2.obtainOtherPrivileges,
User_Interaction_Required:item.cve.impact.baseMetricV2.userInteractionRequired}]->(l)
create (a)-[:belongsTo]->(i)
create (a)-[:Problem_Type]->(c)
create (a)-[:referencedBy]->(r)"""
filepath = "file:///" + file
queryVariable = {"filepath": filepath}
session = self.driver.session()
print("\n Inserting file: " + file)
session.run(query, queryVariable)
return "\n File: " + file + " insertion completed. \n----------"
Solved! Go to Solution.
04-10-2021 04:07 PM
I figured out two mistakes. 1) i used merge which means for every record i was searching and creating. So i created a constraint for cve and cwe to make the procedure faster. 2) i combined the seperate files to one like that: unwind $files as files
where $files is my list with all files passed by python. There is no need for multi-threading. I only increased the memory resources on the database settings.
04-09-2021 01:59 AM
I'm a bit confused by this statement
being followed by this code
Which suggests you do create one session per file.
04-09-2021 08:00 AM
Maybe i miss-noted it. I create in main() the session with url, username, pass etc. and as above i call the session for each file. I've also tried to create a session inside this function (for each file) but with the same results.
04-10-2021 04:07 PM
I figured out two mistakes. 1) i used merge which means for every record i was searching and creating. So i created a constraint for cve and cwe to make the procedure faster. 2) i combined the seperate files to one like that: unwind $files as files
where $files is my list with all files passed by python. There is no need for multi-threading. I only increased the memory resources on the database settings.
05-19-2022 12:28 PM
Have you found a multi-threaded solution by the way?
01-24-2023 01:22 PM
Yes. Kind of actually. Using periodic iteration and parallel tasks in cypher. Check this thread. Is there a way to run concurrently a query that ha... - Neo4j - 64142
All the sessions of the conference are now available online