Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
01-04-2020 09:59 PM
We have a lot of (n1:EffortUser)-[r1:EFFORT]->(n2:EffortObject) that need to be counted by day and week, i.e. how many EffortObject:Email did a EffortUser SENT ... if you have a lot of users and emails that can take quite some time so we would like to parallelize this query ...
Right now we are using ...
match(n1:EffortUser)-[r1:EFFORT]-(n2:EffortObject)
where r1.Effort = 'yes' and r1.TimeEvent>='2017-01-01' and r1.TimeEvent<='2017-12-31'
return distinct n1.Name as User, date(datetime(r1.TimeEvent)) as date, count(distinct r1.IdUnique) as count
order by user, date
There seem to be a few options to parallelize / optimize this but all are rather poorly documented so I need some help please!
I did a bit of research and found the following APOC functions but try as I might I cannot get them to work (and I could not find much here or on Stackoverflow either) ... could someone PLEASE provide some guidance on which of the below options is best incl. an example using the above sample code? This is driving me nuts ... we have 4 cores and 32 GB of memory so this should run pretty fast but I just cannot get it to work ...
https://neo4j.com/docs/labs/apoc/current/cypher-execution/
CALL apoc.cypher.runMany('cypher;\nstatements;',{params},{config})
runs each semicolon separated statement and returns summary - currently no schema operations
CALL apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value
executes fragment in parallel batches with the list segments being assigned to _
https://neo4j.com/docs/labs/apoc/current/cypher-execution/running-cypher/
apoc.cypher.mapParallel(fragment :: STRING?, params :: MAP?, list :: LIST? OF ANY?) :: (value :: MAP?)
apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _
apoc.cypher.mapParallel2(fragment :: STRING?, params :: MAP?, list :: LIST? OF ANY?, partitions :: INTEGER?, timeout = 10 :: INTEGER?) :: (value :: MAP?)
apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _
apoc.cypher.parallel(fragment :: STRING?, params :: MAP?, parallelizeOn :: STRING?) :: (value :: MAP?)
apoc.cypher.parallel2(fragment :: STRING?, params :: MAP?, parallelizeOn :: STRING?) :: (value :: MAP?)
apoc.cypher.runMany(cypher :: STRING?, params :: MAP?, config = {} :: MAP?) :: (row :: INTEGER?, result :: MAP?)
apoc.cypher.runMany('cypher;\nstatements;',{params},[{statistics:true,timeout:10}]) - runs each semicolon separated statement and returns summary - currently no schema operations
I don't think adding a query profile etc as per community guidelines is useful here but please correct me if I'm wrong. Any help here will be greatly appreciated so happy to add whatever info is required.
Solved! Go to Solution.
01-07-2020 08:10 PM
@stefan.armbruster some more news from the parallel query front ...
Regarding issue 1 - CPU goes to 100% and instance becomes unresponsive
This still happens, no idea why. I will check the logs when I have some time. Would be good to find out - or even better if one could wrap the parallel function into some other function that prevents this from happening. Reality is it's unlikely that a normal user would get these queries right on the first try so it involves a lot a trial and error and having to shut down the instance every so often as it became unresponsive it not making that any easier, not to mention having to apologize to colleagues
Regarding issue 2 - Success, I now have a query that delivers what I need
After some playing around I settled on the following query which works fine.
MATCH (e:Employee)-[:LINKED_TO|:MATCHED_TO]->(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS items
CALL apoc.cypher.mapParallel2("
OPTIONAL MATCH (_)-[r2]-(o:EffortObject)
WHERE r2.Effort = 'yes' and not r2.TimeEvent is null and r2.TimeEvent >= '2017-10-01' and r2.TimeEvent <= '2017-12-31'
RETURN date(datetime(r2.TimeEvent)) as date, count(distinct r2.IdUnique) as events",
{parallel:True, batchSize:2000, concurrency:20}, items, 4) YIELD value
RETURN value.date as date, sum(value.events) as events
ORDER BY date
Some key observations that might help others:
(_)-[r2]-(o:EffortObject) works but (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) does not
(:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)-[r]-(:EffortObject)
so in my case the normal part of the query takes care of (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)
the results of which are handed off to the parallel query as (_)
and then the parallel query does the rest as in (:EffortUser)-[r]-(:EffortObject) respectively (_)-[r]-(:EffortObject)
{parallel:True, batchSize:2000, concurrency:20}
does impact the query and shaves another 1s off it reducing it from 5s to 1s and batchSize:2000 and concurrency:20 seem to be the optimal settings, anything higher or lower than that and the query time increases againIf you spot anything in the above that makes no sense or can be optimized further please let me know! I'd be especially interested to hear any tips about making sure that the parallel query does not return different results ... that is a bit worrying if you ask me (see below).
01-05-2020 12:30 PM
Before thinking around parallization I'd first revisit the data model:
Consider to embed the relationship property filter r1.Effort
and r1.TimeEvent
into the relationship type. Not sure about sematics of your Effort
property.
Embedding time is a good idea if you always to queries on fixed time ranges. If you always query for the same year, you could have :EFFORT_2017
instead of :EFFORT
.
With that in place your MATCH
will be way more selective and you don't have to filter out that much.
Once done with this, I'd probably use apoc.cypher.mapParallel2
. Note I haven't actually tested the snippet below, it should more outline the idea:
MATCH (u:EfforUser) WITH collect(u) AS users
CALL apoc.cypher.mapParallel2(
"MATCH (_)-[r:EFFORT_2017]-(n:EffortObject)
RETURN _.name as user, date(datetime(r.TimeEvent)) as date, count(distinct r.IdUnique) as count",
{}, users, 4) YIELD value
RETURN value.user as user, value.date as date, value.count as count
The _
refers to the current iteration.
01-05-2020 07:25 PM
Danke Stefan und frohes Neues!
Thanks to your example above as well as @simon's post here Using parallel queries to sum value of bitcoin outputs connected to bitcoin address nodes which I found a little later I got the following apoc.cypher.mapParallel2()
Cypher to work ...
MATCH (u:EffortUser)
WITH collect(u) AS users
CALL apoc.cypher.mapParallel2("
MATCH (_)-[r]-(:EffortObject)
WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31'
RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
{}, users, 4) YIELD value
RETURN value.user as user, value.date as date, sum(value.count) as count
ORDER BY user, date
Regarding r.Effort = 'yes'
, I checked and unfortunately we do need this additional filter as not all relationships between EffortUser and EffortObject are true 'effort'. Either way, the query with r.Effort = 'yes'
takes 16,622 ms and without 16,429 ms so not a big enough difference to worry about this for now.
Regarding your suggestion of ...
Embedding time is a good idea if you always to queries on fixed time ranges. If you always query for the same year, you could have
:EFFORT_2017
instead of:EFFORT
.
... I decided against implementing this as the min max TimeEvent value inputs are very fluid so not sure this would do us much good, besides the above query only took 16,622 ms to complete which is awesome! I'm making a note of this though to revisit it later, there might be something there worth exploring when we need those extra few ms
Anyway, the basic query is working now, which is great, however when I start customizing it the results quickly become erratic and I'm not sure why - most likely operator error but I can't figure it out. Here are the main two issues I'm grappling with:
Issue 1: Query fails when making minor Cypher changes
I'm only removing the and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31'
, everything else stays the same (see below) but now the query fails and for some reason returns no results.
MATCH (u:EffortUser)
WITH collect(u) AS users
CALL apoc.cypher.mapParallel2("
MATCH (_)-[r]-(:EffortObject)
WHERE r.Effort = 'yes'
RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
{}, users, 4) YIELD value
RETURN value.user as user, value.date as date, sum(value.count) as count
ORDER BY user, date
Here are the PROFILE results ...
Right around the same time my CPU started going to 100% even though the previous queries that returned results never really exceeded 25% (see below).
To me this looks like I'm somehow overloading the process by removing the date range filter? Can that be? Are there any additional settings I should make to avoid this? I noticed your sample query has {}
but I saw @simon using some additional parameters here such as {parallel:true, batchSize:1000, concurrency:4}
.
Issue 2: Additional filter settings are not working but should
This might be related to issue 1 above but I wasn't sure so thought I'd outline it anyway - as this is the real challenge to productionize this query for our GraphQL API setup.
Our graph really looks like this ...
(:Employee)-[]-(:EffortUser)-[]-(:EffortObject)
... so one employee can have multiple users across multiple platforms. Looks a bit like this from left to right, i.e. Employee LINKED_TO one EffortUser (in reality there are multiple here) and multiple EffortObject nodes (emails in this case).
When trying to incorporate the above structure into the Cypher query I have the same issue as in suddenly the query returns no results even though it should as the link between Employee 'cbartens' and EffortUser clearly exists (see graph example above) ...
MATCH (e:Employee)-[]-(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS users
CALL apoc.cypher.mapParallel2("
MATCH (_)-[r]-(:EffortObject)
WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31'
RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
{}, users, 4) YIELD value
RETURN value.user as user, value.date as date, sum(value.count) as count
ORDER BY user, date
Now maybe that is related to issue 1 above as in the additional filter makes the query more complex and overloads it but I just don't know. Any ideas?
I tried different Cypher variations (example below) but the outcome is always the same, query returns no results even though it should, i.e. the nodes and relationships definitely exist.
MATCH (e:Employee)
WHERE e.Name = 'cbartens'
WITH collect(e) AS employees
CALL apoc.cypher.mapParallel2("
MATCH (_)-[]-(:EffortUser)-[r]-(:EffortObject)
WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31'
RETURN _.Name as employee, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
{}, employees, 4) YIELD value
RETURN value.employees as employees, value.date as date, sum(value.count) as count
ORDER BY employees, date
01-06-2020 02:16 AM
My suspicion is that there might be relationships that don't have a IdUnique
property (the MATCH
you're doing delivers all relationships independent on their type).
In that case the inner statement might break on count(distinct r.IdUnique)
. Maybe replace it with a count(distinct coalesce(r.IdUnique, "n/a"))
or filter them out before using WHERE r.IdUnique is not null
01-06-2020 02:29 AM
Good thinking ... let me check ... no, sorry, that's not it I think, see below.
01-06-2020 02:55 AM
same with r.TimeEvent
?
01-06-2020 03:01 AM
That must be it ... see below ... the last 4 are Effort='yes' relationships
Do I just add where not r.TimeEvent is null
to the filter?
01-06-2020 04:39 AM
@stefan.armbruster this query here works now but it only returns results by EffortUser, I cannot figure out how to pass the Employee.Name into the results as well.
MATCH (e:Employee)-[]-(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS users
CALL apoc.cypher.mapParallel2("
MATCH (_)-[r]-(:EffortObject)
WHERE r.Effort = 'yes' and not r.TimeEvent is null and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31'
RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as events",
{}, users, 4) YIELD value
RETURN value.user as user, value.date as date, sum(value.events) as events
ORDER BY date, user
Also, every time I try something that does not work the server kind of enters a DoS ... i.e. the CPU goes to 100% and the instance becomes unresponsive and has to be restarted ... is there any way to avoid this? It kind of makes trial and error very difficult to get the query right.
Would it be possible to wrap the above statement into the following?
apoc.cypher.runTimeboxed(cypher :: STRING?, params :: MAP?, timeout :: INTEGER?) :: (value :: MAP?)
apoc.cypher.runTimeboxed('cypherStatement',{params}, timeout) - abort kernelTransaction after timeout ms if not finished
Only other thing I found is this https://github.com/neo4j/neo4j-javascript-driver/issues/318 but I don't think this helps given I use Neo4j Desktop.
01-06-2020 10:36 AM
You can try to move the first traversal into the inner statement:
MATCH (e:Employee)
WITH collect(e) as employees
call apoc.cypher.mapParallel2("match (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) ....
I'd also add relationship directtion >
or <
.
If you database seems to become unresponsive it's maybe excessive heap usage due to the `collect´. Do you see any suspicious messages in debug.log?
01-07-2020 08:10 PM
@stefan.armbruster some more news from the parallel query front ...
Regarding issue 1 - CPU goes to 100% and instance becomes unresponsive
This still happens, no idea why. I will check the logs when I have some time. Would be good to find out - or even better if one could wrap the parallel function into some other function that prevents this from happening. Reality is it's unlikely that a normal user would get these queries right on the first try so it involves a lot a trial and error and having to shut down the instance every so often as it became unresponsive it not making that any easier, not to mention having to apologize to colleagues
Regarding issue 2 - Success, I now have a query that delivers what I need
After some playing around I settled on the following query which works fine.
MATCH (e:Employee)-[:LINKED_TO|:MATCHED_TO]->(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS items
CALL apoc.cypher.mapParallel2("
OPTIONAL MATCH (_)-[r2]-(o:EffortObject)
WHERE r2.Effort = 'yes' and not r2.TimeEvent is null and r2.TimeEvent >= '2017-10-01' and r2.TimeEvent <= '2017-12-31'
RETURN date(datetime(r2.TimeEvent)) as date, count(distinct r2.IdUnique) as events",
{parallel:True, batchSize:2000, concurrency:20}, items, 4) YIELD value
RETURN value.date as date, sum(value.events) as events
ORDER BY date
Some key observations that might help others:
(_)-[r2]-(o:EffortObject) works but (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) does not
(:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)-[r]-(:EffortObject)
so in my case the normal part of the query takes care of (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)
the results of which are handed off to the parallel query as (_)
and then the parallel query does the rest as in (:EffortUser)-[r]-(:EffortObject) respectively (_)-[r]-(:EffortObject)
{parallel:True, batchSize:2000, concurrency:20}
does impact the query and shaves another 1s off it reducing it from 5s to 1s and batchSize:2000 and concurrency:20 seem to be the optimal settings, anything higher or lower than that and the query time increases againIf you spot anything in the above that makes no sense or can be optimized further please let me know! I'd be especially interested to hear any tips about making sure that the parallel query does not return different results ... that is a bit worrying if you ask me (see below).
01-08-2020 12:04 PM
Thanks for the great feedback. Would you mind adding this as github issue as well. Otherwise I see the danger of stuff getting lost. Thanks in advance.
01-08-2020 01:38 PM
No worries, happy to @stefan.armbruster. See https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373. And thanks again for your help.
01-10-2020 10:56 AM
I've been trying to use apoc.cypher.mapParallel2()
with the latest Neo4j and APOC versions and clearly observed the issue with it randomly returning partial or no results. I put more details into the GitHub issue: https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373#issuecomment-573161323
01-18-2020 07:11 PM
Update - After running and comparing many more standard vs parallel queries I'm pretty sure the partial results issue is limited to querying / counting relationships. Whenever I count nodes the counts returned by the parallel query always match the standard query 100%.
01-19-2020 03:26 PM
I've observed this problem on counting nodes as well: https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373#issuecomment-576059374
02-24-2020 03:26 AM
Yes, you are right! Much less pronounced when counting nodes though ... which is why I missed it initially.
All the sessions of the conference are now available online