cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Issue with transactions from Py2neo script in Jupyter halting after ~90s

Hi,

I’ve been struggling through this work for some time, and my thesis due date is drawing ever nearer. I’m creating a graph database of genotype-phenotype relationships and, I now have all nodes in place and am just trying to connect the variant nodes to the subject nodes. I’ve used cypher transactions for this, which work ok up to a certain point but approximately 90s in to the transactions starting they stop, with no error message, nothing in the debug log and nothing in the query log. My code is as follows:


import pandas as pd
import csv
import math
import allel
import zarr
from py2neo import Graph, Node, Relationship, NodeMatcher

zarr_path = '/media/user/Seagate Backup Plus Drive/uk_twin_cohort/exome/chroms.zarr'
callset = zarr.open_group(zarr_path, mode='r')

graph = Graph(user="neo4j", password="password")

chrom_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,'X']
matcher = NodeMatcher(graph)

for chrom in chrom_list:
    chrom_label = "Chromosome_" + str(chrom)
    samples = callset[chrom]['samples']
    variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['AC','AF_AFR', 'AF_AMR', 'AF_ASN', 'AF_EUR', 'AF_MAX', 'CGT', 'CLR', 'CSQ', 'DP', 'DP4', 'ESP_MAF', 'FILTER_LowQual', 'FILTER_MinHWE', 'FILTER_MinVQSLOD', 'FILTER_PASS', 'HWE', 'ICF', 'ID', 'IS', 'PC2', 'PCHI2', 'POS', 'PR', 'QCHI2', 'QUAL', 'REF', 'ALT', 'INDEL', 'SHAPEIT', 'SNP_ID', 'TYPE', 'UGT', 'VQSLOD', 'dbSNPmismatch', 'is_snp', 'numalt'], index='POS')
    pos = variants['POS'][:]
    pos = pos.tolist()
    ref = variants['REF'][:]
    alt = variants['ALT'][:]
    dpz = callset[chrom]['calldata/DP']
    dp = dpz[:, 0]
    psz = callset[chrom]['calldata/PS']
    plz = callset[chrom]['calldata/PL']
    gpz = callset[chrom]['calldata/GP']
    calldata = callset[chrom]['calldata']
    gt = allel.GenotypeDaskArray(calldata['GT'])
    hap = gt.to_haplotypes()
    hap = gt.to_haplotypes()
    hap1 = hap[:, ::2]
    hap2 = hap[:, 1::2]
    list_h1 = hap1[:, 0].compute()
    list_h1 = list_h1.tolist()
    list_h2 = hap2[:, 0].compute()
    for i in range(len(samples)):
        subject = samples[i]
        dp = dpz[:, i]
        ps = psz[:, i]
        pl = plz[:, i]
        gp = gpz[:, i]
        list_h1 = hap1[:, i].compute()
        list_h2 = hap2[:, i].compute()
        bp1 = []
        bp2 = []
        hpt = []
        n1 = []
        n2 = []
        g = Graph()
        print(subject)
        print("Subject " + str(i) + " of " + str(len(samples)))
        s = matcher.match("Subject", subject_id= subject).first()
        print(s)
        if s is None:
            continue
        j = 0
        nodes = []
        for j in range(len(pos)):
            h1 = int(list_h1[j])
            h2 = int(list_h2[j])
            k = int(pos[j])
            l = str(ref[j])
            m = str(alt[j][h1-1])
            o = str(alt[j][h2-1])
            if h1 == 0 and h2 == 0:
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)
                nodes.append(a)

            elif h1 == 0 and h2 > 0:
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=o).first()
                nodes.append(a)

            elif h1 > 0 and h2 == 0:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)

            elif h1 == h2 and h1 > 0:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                nodes.append(a)

            else:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=o).first()
                nodes.append(a)
            if j % 10000 == 0:
                print(str(j) + " rows complete.")
        print(subject + " matching complete.")
        print(len(nodes))
        j=0
        tx = g.begin()
        for j in range(len(pos)):
            read_depth = int(dp[j])
            ps1 = int(ps[j])
            PL0 = int(pl[j][0])
            PL1 = int(pl[j][1])
            PL2 = int(pl[j][2])
            genotype = str(h1) + '|' + str(h2) 
            GP0 = float(gp[j][0])
            GP1 = float(gp[j][1])
            GP2 = float(gp[j][2])
            h1 = int(list_h1[j])
            h2 = int(list_h2[j])
            k = int(pos[j])
            l = str(ref[j])
            m = str(alt[j][h1-1])
            o = str(alt[j][h2-1])

            if h1 == 0 and h2 == 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "A":a.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
            elif h1 == 0 and h2 > 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "A":a.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "B":b.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
            elif h1 > 0 and h2 == 0:
                x = (2*j)
                a = nodes[j]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "A":a.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "B":b.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
            elif h1 == h2 and h1 > 0:
                x = (2*j)
                a = nodes[j]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "A":a.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
            else:
                x = (2*j)
                a = nodes[j]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "A":a.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {HTA:{H1}, HTB:{H2}, GT:{GT}, dp:{DP}, phase_set:{PS1}, PL0:{PL0}, PL1:{PL1}, PL2:{PL2}, GP0:{GP0}, GP1:{GP1}, GP2:{GP2}}]->(a)", {"S":s.identity, "B":b.identity, "H1":h1, "H2":h2, "GT":genotype, "DP":read_depth, "PS1":ps1, "PL0":PL0, "PL1":PL1, "PL2":PL2, "GP0":GP0, "GP1":GP1, "GP2":GP2 })
            if j % 1000 == 0:
                print(str(j) + " rows added to database.")
                tx.commit()
                tx = g.begin()

    print(chrom_label + " completed.")

Given the lack of error message I can only assume it’s some sort of timeout, but can’t see where this would happen. I’m using Py2neo v4, python 3.7 and Neo4j Enterprise 3.5.14. Any help or advice would be much appreciated.

1 REPLY 1

sorry, I have no experience with the py2neo dsl, so not sure what you're trying to achieve there.
If you can rewrite that to plain cypher that would be easier to help with.
if you merge on relationships with that many properties, it won't be fast.Perhaps it's a modeling issue, i.e. is your relationship actually a concept in the domain which would be it's own node? not sure.

anything on your server logs? in terms of IO / CPU / Memory? Or client connections used up.

perhaps you have a leaking tx?

did you check the

finished()
Indicates whether or not this transaction has been completed or is still open. (edited)

process()
Send all pending statements to the server for processing.

you also don't do anything with the Cursorthat's returned from tx.run()

at least you should call close() on it afaik

https://py2neo.org/v4/database.html#py2neo.database.Cursor

https://py2neo.org/v4/database.html#transactions

Nodes 2022
Nodes
NODES 2022, Neo4j Online Education Summit

All the sessions of the conference are now available online