cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Calling REST API within apoc.periodic.commit()

bieri
Node Link

Hi Everyone

First post here. Hope I am doing this right.
I am working on a student project. It's aim is to create a database that loads information from other systems peridocally.

Things have been working fine, but it seems I can't properly wrap my head around apoc.periodic.commit()
I am trying to load json data from a REST API that uses the parameters offset and limit to paginate (within the url). As a test, I simply would like to create customer Nodes. This is what I got:

CALL apoc.periodic.commit("
CALL apoc.load.jsonParams('https://SERVER/v1/customers?limit=100&offset='+offset,{Authorization:$Auth},null) 
YIELD value
UNWIND value.items AS customer
create (cust:actiCustomer{name:customer.name})
WITH cust limit {limit}
RETURN CASE WHEN count(*) <100 THEN 0 ELSE count(*) END AS count
",{limit:1000, offset : 0})

My problem is: How do I increase the offset after each run-through? It currently ends in an HTTP Response: 429 Too Many Requests - obviously - as it always requests the first 100 entries.

I also tried to work with :params, but it doesn't let me update the parameters within the periodic.commit() call. Any suggestions are appreciated.

Best
Simon

1 ACCEPTED SOLUTION

ive done similar and as described at

when fetching data from the Zendesk API. Same deal where i make a request, and might need to refetch the remaining data. it does use a help node to keep track of where i left off. maybe you can do similar

View solution in original post

7 REPLIES 7

bieri
Node Link

After more than a day of fiddling around, I've solved the issue myself. However, I am using a node as a helper to store the offset and to count upwards. Not very pretty, but it works. Here's the code:

CALL apoc.periodic.commit("
MATCH (importHelper:ImportHelper) 
CALL apoc.load.jsonParams('https://URL/api/v1/customers?limit=100&offset='+ importHelper.offset,{Authorization:$Auth},null) 
YIELD value
UNWIND value.items AS customer
create (actiCust:actiCustomer{name:customer.name})
WITH actiCust limit {limit}
MATCH (importHelper:ImportHelper)
SET importHelper.offset = importHelper.offset + 1
RETURN CASE WHEN count(*) <100 THEN 0 ELSE count(*) END AS count
",{limit:1000})

Weird thing: I was initially trying to increase importHelper.offset by 100 per run-through like this:

SET importHelper.offset = importHelper.offset + 100

Because I retreive a 100 datasets per call. But the offset always ended up being 10'000 after only one batch. Therefore I figured this line got called 100 times and I set it to +1 instead of +100.

If somebody would try to explain what happened, it'd be highly appreciated. Also, if anybody knows a more convenient way on how to achieve this instead of using a helper node, I'd love to know as well!

Best
Simon

So the purpose of apoc.periodic.commit() is to execute the same query over and over until it returns 0. The prototypical use case is something like this, say for deleting all nodes of a certain label:

CALL apoc.periodic.commit("
 MATCH (n:ToDelete)
 WITH n LIMIT {limit}
 DETACH DELETE n
 RETURN count(n)", {limit:1000})

Note that the source set (:ToDelete nodes) are diminishing with each run until there's none left, so this is really made to execute when the source is diminishing. If we used this in some other case instead, such as when we need to filter by a property:

CALL apoc.periodic.commit("
 MATCH (n:Node)
 WHERE n.toDelete = true
 WITH n LIMIT {limit}
 DETACH DELETE n
 RETURN count(n)", {limit:1000})

This is far less efficient. We'll match to and filter out nodes until we find our first batch of 1000, delete them, and then start over from the very beginning and match to (and filter out) the same nodes we filtered out previously, and then matching out further to more nodes until we get our next batch of 1000...and then we start over matching from the beginning again. It starts over from a fresh MATCH with each batch. (For this case using apoc.periodic.iterate() would be preferred instead of commit())

That's why your query doesn't work without a node holding an offset value (which you use and then update), apoc.periodic.commit() isn't meant to work with these kind of offset/paging queries without some way to store and update an offset as you're doing.

If you had a good way to stream all of the json in a single call (streaming it, not needing to use a limit and offset) then you could use apoc.periodic.iterate(), which streams in results, pausing with each batch to execute the update query for the batch, then continuing to stream (where it left off) until it has another batch to update.

As to what's going on with updating your offset, this is because of the cardinality of your query: operations in Cypher produce rows, and they execute per row.

You've already made a limited call to get JSON yielding 100 rows of values. Then you UNWIND value.item (I don't know how many items are supposed to be per row, but this multiplies out your rows accordingly), creating your :actiCustomer nodes, then limit those to 100 (I don't know why), then for each of those 100 rows you MATCH to your single :ImportHelper node, and then your SET is run for each of those 100 rows, updating for a sum of 100 (it used to be a sum of 10000 when you had + 100 before, since that was being executed 100 times).

To improve this, you should probably be using FOREACH instead of UNWIND. Remember UNWIND multiplies out your rows (it produces a row for every element of the unwound list), it isn't a looping construct (it only appears that way since Cypher operations execute per row). If you use FOREACH instead, the Cypher inside the FOREACH will execute per element of the list, but it won't increase cardinality, you'll still have your original 100 rows).

Then you can perform your count aggregation, which gives you a single row with the count, and you can update your import helper, which will happen only that one time (since you have only a single row). Then you return the (rounded) count. (Also, you already matched on your importHelper, it's in scope, so no need to match to it again).

CALL apoc.periodic.commit("
MATCH (importHelper:ImportHelper) 
CALL apoc.load.jsonParams('https://URL/api/v1/customers?limit=100&offset='+ importHelper.offset,{Authorization:$Auth},null) 
YIELD value
FOREACH(customer IN value.items |
   create (actiCust:actiCustomer{name:customer.name}) )
WITH importHelper, count(value) as count
SET importHelper.offset = importHelper.offset + count
RETURN CASE WHEN count <100 THEN 0 ELSE count END AS count
",{limit:1000})

bieri
Node Link

Hi Andrew

Thanks a ton! That helped me already. One last thing:
Creating those actiCust nodes was only a test. (Works fine )

But I'd also need to match and edit other nodes within that FOREACH. Now, neo4j tells me that I am not allowed to. Therefore I tried to call apoc.do.when within the FOREACH which, as it seems, I am also not allowed to.

Any further suggestions?

also the ' 429 Too Many Requests -` might simply be as a result of the API that you are getting customers from is configured to allow you to only make N request per duration, i.e. you cant overwhelm the API by sending 100 requests per second. And if you do try to send 100 API request the API may simply error with the 'Too Many Requests'. If this is what is happening so as to avoid failure one can add a

call apoc.uitl.sleep(1000) within the cypher statement fed to apoc.periodic.commit and this will cause the Cypher statement to sleep for 1000 milliseconds. Obviously you can modify the 1000 accordingly.
This isnt so much a fault with apoc itself, rather that your requesting API might be limiting you to the number of request

bieri
Node Link

Hi Dana

Thanks for your reply, I appreciate it.

Good idea with util.sleep. However, 429 - Too many requests is not the root problem. I need a way to work with an offset variable within apoc.periodic.commit because the API sets the limit per request to 1000 by default and I have to use an offset variable to get all data from the API. Said offset variable needs to increase by the limit for every request. (Thus creating a new starting point)

Andrew's idea with using Foreach combined with my idea of using a helper node to store the offset within the request works for creating nodes - but I can't match any other ones and thus cannot create any edges nor update existing nodes, because it's not allowed within the foreach.

ive done similar and as described at

when fetching data from the Zendesk API. Same deal where i make a request, and might need to refetch the remaining data. it does use a help node to keep track of where i left off. maybe you can do similar

Dana, you're a life-saver!

I was able to construct the query with the help of your article

I noticed that you always use MERGE instead of MATCH within apoc.periodic.commit - so I did the same. Combined with the foreach for counting the datasets I ended up with the following code:

(I commented-out the apoc.periodic.commit() call for better readability.)

//CALL apoc.periodic.commit("
MATCH (importHelper:ImportHelper) SET importHelper.counter = 0
WITH importHelper
CALL apoc.load.jsonParams($url + importHelper.offset,{Authorization:$auth},null)
YIELD value
  //Increase the counter & prepare for next run
  FOREACH (actiCust IN value.items |
    SET importHelper.counter = importHelper.counter + 1
  )
  SET importHelper.offset = importHelper.offset + importHelper.counter
  //Do operations on data
  WITH importHelper, value
    UNWIND value.items AS actiCust
    MERGE (c:Customer{domain:actiCust.description}) SET c.actiID = actiCust.id
    //Exit condition
    WITH importHelper, value LIMIT {limit}
      RETURN CASE WHEN toInteger(importHelper.counter) < 100 THEN 1 ELSE importHelper.counter END AS count
//",{limit:1000})

As mentioned in the beginning: If I change the following line from

 MERGE (c:Customer{domain:actiCust.description}) SET c.actiID = actiCust.id

to

 MATCH (c:Customer{domain:actiCust.description}) SET c.actiID = actiCust.id

apoc.periodic.commit only runs through one time.

Now, my guess why that is happening is: With that match statement I am searching for specific nodes. If none is found for this run through MATCH returns 0, thus, the exit condition for apoc.periodic.commit is instantly met - all later code being "ignored".

Can anyone confirm? I think that might also be very helpful for others trying to build such an API-Import.