Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
03-15-2019 06:23 AM
We have imported the bitcoin blockchain into a neo4j graph database. The DB schema looks like this:
For each address, for a given entity (company) I would like to calculate the current bitcoin balance of an address.
To do this we must sum up the bitcoinValue property of all :Output
nodes that belong to each :Address
node via the [:LOCKED_BY]
that do not have an [:UNLOCKED_BY]
.
My current workflow to do this is like so:
// set the query parameter first.
:params "entity" : "Binance"
Then run the query:
// match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity
// distinct must be included or the query will run for a very long time
with distinct a
// match outputs locked by address
match (a)<-[:LOCKED_BY]-(o: Output)
// exclude those outputs that have been subsequently spent
where not (o)-[:UNLOCKED_BY]->()
return
a.address as address,
round(sum(o.bitcoinValue)*100000000)/100000000 as balance
This works fine, but for a large number of addresses it can take some time. Is there a way to parallelise this using apoc.cypher.parallel()
or some other apoc query? There wasn't documentation for these functions that I could find.
Many thanks.
03-18-2019 10:06 AM
I found the answer was to use apoc.cypher.mapParallel2()
. The documentation was a little hard to understand because there were no examples, but this reduced the run-time of the above query from about 12 seconds for 300k addresses to around 2 seconds. We have longer queries where this technique is proving useful, but this is what I did:
# match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity
# mapParallel will iterate over a list, so we `collect`
with collect(distinct a) as addresses
# The first argument is the cypher code we want to run as a string
# The second argument is a map of parameters, e.g. {parallel: true}
# The third argument is the list to iterate over
# (in this example `addresses` from above)
# The fourth argument is an integer to split the list into partitions,
# (though I'm not sure how this relates to the batchSize or concurrency
# parameters from the third argument)
CALL apoc.cypher.mapParallel2("
optional match (_)<-[:LOCKED_BY]-(o: Output)
where not (o)-[:UNLOCKED_BY]->()
return _.address as address,
round(sum(o.bitcoinValue)*100000000)/100000000 as balance"
, {parallel:True, batchSize:1000, concurrency:20}, addresses, 20) yield value
# Extract the columns we want to return from the list (returned by `yield value`)
return
value.address as address,
value.balance as balance
Hope this is helpful to someone and open to suggestions if this is not the correct way to use parallelism.
All the sessions of the conference are now available online