cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

How i will import a large dataset using multithreading in python?

damisg7
Node Clone

Hi all, i want to import a large dataset which inlcude json files, (using the apoc library) to neo4j graph db. How i will improve this operation? I used concurrent futures ThreadPoolExecutor as a start for multithreading. Although it seems to be working as starting the threads (each thread is executing one query), only one query at a time is being executed. Below is my python and cypher script. Is there anything i could improve on this script or to change in the db settings so i could use more power of my resources ?

Note that i create a single session in my main() using the supported driver.

    def cve_insertion(self):
        print("\n Inserting Files to Database...")
        files = files_to_insert()
        with concurrent.futures.ThreadPoolExecutor() as ex:
            results = [ex.submit(self.neo4jquery, file) for file in files]
        for f in concurrent.futures.as_completed(results):
            print(f.result())
    def neo4jquery(self, file):
        query = """call apoc.load.json($filepath) yield value                   
                                unwind value.CVE_Items as item
                                unwind item.cve.description.description_data as descData
                                unwind item.cve.problemtype.problemtype_data as problemtype_data
                                unwind problemtype_data.description as CWE
                                unwind item.cve.references.reference_data as reference_data
    
                                merge (i:GeneralInfo {Data_Type:value.CVE_data_type, Data_Format:value.CVE_data_format,
                                                        Data_Version:value.CVE_data_version, No_CVEs:value.CVE_data_numberOfCVEs,
                                                        Timestamp:value.CVE_data_timestamp})
    
                                merge (a:CVE {id:item.cve.CVE_data_meta.ID, Assigner:item.cve.CVE_data_meta.ASSIGNER,
                                                Description:descData.value, Description_Lang:descData.lang, Published_Date:item.publishedDate,
                                                Last_Modified_Date:item.lastModifiedDate})                                  
    
                                create (p:CVSS_3 {Version:item.impact.baseMetricV3.cvssV3.version, Vector_String:item.impact.baseMetricV3.cvssV3.vectorString,
                                                    Attack_Vector:item.impact.baseMetricV3.cvssV3.attackVector, Attack_Complexity:item.impact.baseMetricV3.cvssV3.attackComplexity,
                                                    Privileges_Required:item.impact.baseMetricV3.cvssV3.privilegesRequired, User_Interaction:item.impact.baseMetricV3.cvssV3.userInteraction,
                                                    Scope:item.impact.baseMetricV3.cvssV3.scope, Confidentiality_Impact:item.impact.baseMetricV3.cvssV3.confidentialityImpact,
                                                    Integrity_Impact:item.impact.baseMetricV3.cvssV3.integrityImpact, Availability_Impact:item.impact.baseMetricV3.cvssV3.availabilityImpact,
                                                    Base_Score:item.impact.baseMetricV3.cvssV3.baseScore, Base_Severity:item.impact.baseMetricV3.cvssV3.baseSeverity})
    
                                create (l:CVSS_2 {Version:item.impact.baseMetricV2.cvssV2.version, Vector_String:item.impact.baseMetricV2.cvssV2.vectorString,
                                                    Access_Vector:item.impact.baseMetricV2.cvssV2.accessVector, Access_Complexity:item.impact.baseMetricV2.cvssV2.accessComplexity,
                                                    Authentication:item.impact.baseMetricV2.cvssV2.authentication,
                                                    Confidentiality_Impact:item.impact.baseMetricV2.cvssV2.confidentialityImpact,
                                                    Integrity_Impact:item.impact.baseMetricV2.cvssV2.integrityImpact,
                                                    Availability_Impact:item.impact.baseMetricV2.cvssV2.availabilityImpact,
                                                    Base_Score:item.impact.baseMetricV2.cvssV2.baseScore})
    
                                merge (c:CWE {id:CWE.value, Language:CWE.lang})
    
                                merge (r:Reference_Data {url:reference_data.url, name:reference_data.name, refSource: reference_data.refsource})
    
                                create (a)-[:CVSS3_Impact{Exploitability_Score:item.cve.impact.baseMetricV3.exploitabilityScore,
                                                            Impact_Score:item.cve.impact.baseMetricV3.impactScore}]->(p)
                                create (a)-[:CVSS2_Impact{Exploitability_Score:item.cve.impact.baseMetricV2.exploitabilityScore,
                                                            Severity:item.cve.impact.baseMetricV2.severity, Impact_Score:item.cve.impact.baseMetricV2.impactScore,
                                                            acInsufInfo:item.cve.impact.baseMetricV2.acInsufInfo,
                                                            Obtain_All_Privileges:item.cve.impact.baseMetricV2.obtainAllPrivileges,
                                                            Obtain_User_Privileges:item.cve.impact.baseMetricV2.obtainUserPrivileges,
                                                            Obtain_Other_Privileges:item.cve.impact.baseMetricV2.obtainOtherPrivileges,
                                                            User_Interaction_Required:item.cve.impact.baseMetricV2.userInteractionRequired}]->(l)
                                create (a)-[:belongsTo]->(i)
                                create (a)-[:Problem_Type]->(c)
                                create (a)-[:referencedBy]->(r)"""
        filepath = "file:///" + file
        queryVariable = {"filepath": filepath}
        session = self.driver.session()
        print("\n Inserting file: " + file)
        session.run(query, queryVariable)
        return "\n File: " + file + " insertion completed. \n----------"
1 ACCEPTED SOLUTION

damisg7
Node Clone

I figured out two mistakes. 1) i used merge which means for every record i was searching and creating. So i created a constraint for cve and cwe to make the procedure faster. 2) i combined the seperate files to one like that: unwind $files as files where $files is my list with all files passed by python. There is no need for multi-threading. I only increased the memory resources on the database settings.

View solution in original post

5 REPLIES 5

I'm a bit confused by this statement

being followed by this code

Which suggests you do create one session per file.

Maybe i miss-noted it. I create in main() the session with url, username, pass etc. and as above i call the session for each file. I've also tried to create a session inside this function (for each file) but with the same results.

damisg7
Node Clone

I figured out two mistakes. 1) i used merge which means for every record i was searching and creating. So i created a constraint for cve and cwe to make the procedure faster. 2) i combined the seperate files to one like that: unwind $files as files where $files is my list with all files passed by python. There is no need for multi-threading. I only increased the memory resources on the database settings.

Have you found a multi-threaded solution by the way?

Yes. Kind of actually. Using periodic iteration and parallel tasks in cypher. Check this thread. Is there a way to run concurrently a query that ha... - Neo4j - 64142