cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

How to use multiprocessing to tackle the result of session.run()?

order_search_paper_byfulltext_English = "CALL db.index.fulltext.queryNodes(\"title_abstract_English\", \"" + query + "\") YIELD node, score " \
                                                        "RETURN node as p, score limit $limit"
result = tx.run(order_search_paper_byfulltext_English, limit=limit)
pool = Pool()
return pool.map(my_paper_attribute_tackle, result , chunksize=100)

when I run the code above, an exception occurs:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/site-packages/neo4j/data.py", line 56, in __new__
    for key, value in iter_items(iterable):
  File "/home/xiaorui/anaconda3/envs/hangkong/lib/python3.9/site-packages/neo4j/conf.py", line 50, in iter_items
    for key, value in iterable:
ValueError: too many values to unpack (expected 2)

even if my my_paper_attribute_tackle function do nothing.

when I add d = paper_result.data() it's ok , however I want to directly map paper_result to different process, is there any solutions?

2 REPLIES 2

We can't see the body of this function, so it's hard to say.

From the python docs, you've got to remember to process the "result" of the TX into a series of records -- I would do this before you put it into your map

In other words, a result coming back from the database (that iterable) is still an open cursor on the database. You can't use that outside of the transaction function. But you can map something over the results

Thank you for reply!
Here is my code of my_paper_attribute_tackle

def my_paper_attribute_tackle(record):
    # print(record)
    paper = record["p"]
    paper[E_KEYWORDS] = json.loads(paper[E_KEYWORDS])
    paper[C_KEYWORDS] = json.loads(paper[C_KEYWORDS])
    paper[AUTHOR] = json.loads(paper[AUTHOR])
    if (paper.get(AUTHORID) != None):
        paper[AUTHORID] = json.loads(paper[AUTHORID])
    paper[FROM] = json.loads(paper[FROM])
    return paper

From the python docs, you've got to remember to process the "result" of the TX into a series of records -- I would do this before you put it into your map

So I must transfer the "result"(an open cursor) to records if I want to use multiprocessing.map ? I want to skip this process because it just costs O(n) (n is the number of the records of this result) ...

Nodes 2022
Nodes
NODES 2022, Neo4j Online Education Summit

All the sessions of the conference are now available online