Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
01-09-2020 05:26 AM
I'm trying without success to stream all nodes from a neo4j graphdb while calculating their path using nodejs and neo4j js driver. I'm using two queries, the first one fetch all nodes nodes and using the observer onNext then I try to execute the query to fetch the path for each node and push it to the buffer. The problem seems that the buffer closes on the onComplete of the first query and does not wait until the end of the code executed in the onNext.
Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF
Here are my services that streams the data into the buffer:
async fetchSiteMap(): Promise<Readable> {
const transaction = this.session.getSession().beginTransaction();
const buffer = new stream.Readable();
buffer._read = () => {};
await transaction.run( `
MATCH (r:Category) RETURN r.code AS code
`)
.subscribe( {
onNext: async node => {
const result = await transaction.run(`MATCH path = (:Category {code: $code})<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
RETURN nodes
ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1`, {code: node.get( 'code' )});
const records: Record[] = get(result, 'records');
if (isEmpty(records)) {
return [];
}
const newRecords = records.map(record => record.get('nodes'));
const path = compact(newRecords[0].map((segment: { properties: GraphProperty }, index: number) => {
if (index === 0) {
return;
}
return CategoryNodePath.fromGraph(<NodeGraphProperty>segment.properties);
}));
buffer.push(JSON.stringify(path));
},
onCompleted: async () => {
buffer.push(null)
transaction.commit();
this.session.close();
},
onError: () => {
buffer.push(null);
},
} );
return buffer;
}
01-09-2020 06:22 AM
What is the stack trace for that error? Does it come from the neo4j driver? You may be closing the transaction when the inner stream has completed for the paths rather than the outer stream of categories.
One thing I've spotted is that you could get away with doing this all in a single Cypher query rather than one to get the code and another to find the paths. Because you're running the query for all categories, you could just stream them directly out of the first query.
transaction.run(`
MATCH path = (:Category)<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
RETURN nodes
ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1
`)
.subscribe({ /* Same code as you have now */ })
It doesn't solve the problem you've reported but it could be a workaround.
01-09-2020 07:00 AM
Thanks Adam, the error comes from the controller of the API when I call the resource that streams the http response. As you mentioned, I would have prefered indeed to run it on a single query, but I can't see how you would search paths for each node if you don't get the node code at first. Your query won't work because there is a limit to 1 to fetch one single path. Therefore not specifying a node code will just retrieve the first node path found.
All the sessions of the conference are now available online