Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
12-07-2022 12:36 AM
I've been trying to run some read queries concurrently using asyncio tasks and gather(). I've tried to run the queries concurrently within a transaction. session and driver, but they all fail. Does anyone have an idea on how to run multiple read queries concurrently using neo4j python driver >v5?
Neo4j version: 4.4
Neo4j Python driver version: 5.3.0
Python 3.10
Here is a reproducible problem:
import asyncio
import traceback
from neo4j import AsyncGraphDatabase
async def test_1_tx(tx):
query = "RETURN 1"
result = await tx.run(query)
record = await result.value()
print("test_1_tx", record)
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(test_2_tx(tx)))
await asyncio.gather(*tasks)
async def test_2_tx(tx):
query = "RETURN 2"
result = await tx.run(query)
record = await result.value()
print("test_2_tx", record)
async def session_task(driver):
async with driver.session() as session:
await session.execute_read(test_1_tx)
async def main():
uri = "neo4j://localhost:7687"
driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)
try:
async with driver.session() as session:
await session.execute_read(test_1_tx)
except Exception as e:
print("asyncio gather within transaction failed")
print(traceback.format_exc())
try:
async with driver.session() as session:
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within session failed")
print(traceback.format_exc())
try:
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(session_task(driver)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within driver failed")
print(traceback.format_exc())
asyncio.run(main())
Stacktrace of error:
Traceback (most recent call last):
File "../test.py", line 37, in main
async with driver.session() as session:
File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 115, in __aexit__
await self.close()
File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 197, in close
await self._connection.fetch_all()
File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 669, in fetch_all
detail_delta, summary_delta = await self.fetch_message()
File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 652, in fetch_message
tag, fields = await self.inbox.pop(
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 74, in pop
await self._buffer_one_chunk()
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 53, in _buffer_one_chunk
await receive_info_buffer(self._socket, self._buffer, 2)
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 290, in receive_info_buffer
n = await sock.recv_info(view[buffer.used:end]), end - buffer.used)
File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 157, in recv_into
res = await self._wait_for_io(io_fut)
File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 116, in _wait_for_io
return await wait_for(io_fut, timeout)
File "../lib/python3.10/site-packages/neo4j/_async_compat/shims/__init__.py", line 70, in _wait_for
return await fut
File "../lib/python3.10/asyncio/streams.py", line 668, in read
await self._wait_for_data('read')
File "../lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Another regular error that occurs:
Failed to read from defunct connection IPv4Address(('localhost', 7687)) (IPv4Address(('127.0.0.1', 7687)))
Solved! Go to Solution.
12-20-2022 03:33 AM - edited 12-20-2022 03:44 AM
Hi,
lines 15 and 46 is where the issues lie. Just like sessions in the sync world aren't thread safe, are sessions in the async world not "task safe". They are not meant to be used concurrently. Please spawn (at least) one session per Task and don't share them across Tasks. The same holds true for transactions. You can't use those concurrently.
Please also have a read in the API docs here as there are some asyncio functions that might catch you off guard by creating Tasks in the background for you.
I've put some comments into your code to explain the problematic areas
import asyncio
import traceback
from neo4j import AsyncGraphDatabase
async def test_1_tx(tx):
query = "RETURN 1"
result = await tx.run(query)
record = await result.value()
print("test_1_tx", record)
tasks = []
for x in range(10):
# this does not work, you can't "fork" a transaction
# tasks.append(asyncio.create_task(test_2_tx(tx)))
await test_2_tx(tx)
# await asyncio.gather(*tasks)
async def test_2_tx(tx):
query = "RETURN 2"
result = await tx.run(query)
record = await result.value()
print("test_2_tx", record)
async def session_task(driver):
async with driver.session() as session:
await session.execute_read(test_1_tx)
async def main():
uri = "neo4j://localhost:7687"
driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)
try:
async with driver.session() as session:
await session.execute_read(test_1_tx)
except Exception as e:
print("asyncio gather within transaction failed")
print(traceback.format_exc())
# try:
# async with driver.session() as session:
# tasks = []
# for x in range(10):
# # this does not work, sessions are not safe to be used concurrently
# tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))
#
# await asyncio.gather(*tasks)
# except Exception as e:
# print("asyncio gather within session failed")
# print(traceback.format_exc())
try:
tasks = []
for x in range(10):
# this is much better, create the session in the new Task!
tasks.append(asyncio.create_task(session_task(driver)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within driver failed")
print(traceback.format_exc())
asyncio.run(main())
One closing thought (pun not intended): you don't close the driver object at the end of your script. You might want to use a context (async with AsyncGraphDatabase.driver(...) as driver:
).
12-08-2022 05:30 AM
Hi @Bertel I remembered your post today when a new version of the python driver docs got published. There's a specific section on running concurrent transactions with asyncio, does this help? https://neo4j.com/docs/python-manual/current/concurrency/
12-08-2022 11:56 PM
Hi, that example shows running a single query. I want to run multiple read queries in a for-loop concurrently, preferably within the same transaction. Do you have an example of this?
12-20-2022 12:32 AM
Hi, I run into the same issue.
I'm having multiple nodes, relations that would like to create concurrently.
Any suggestions how to do that ?
Regards,
P
12-20-2022 03:33 AM - edited 12-20-2022 03:44 AM
Hi,
lines 15 and 46 is where the issues lie. Just like sessions in the sync world aren't thread safe, are sessions in the async world not "task safe". They are not meant to be used concurrently. Please spawn (at least) one session per Task and don't share them across Tasks. The same holds true for transactions. You can't use those concurrently.
Please also have a read in the API docs here as there are some asyncio functions that might catch you off guard by creating Tasks in the background for you.
I've put some comments into your code to explain the problematic areas
import asyncio
import traceback
from neo4j import AsyncGraphDatabase
async def test_1_tx(tx):
query = "RETURN 1"
result = await tx.run(query)
record = await result.value()
print("test_1_tx", record)
tasks = []
for x in range(10):
# this does not work, you can't "fork" a transaction
# tasks.append(asyncio.create_task(test_2_tx(tx)))
await test_2_tx(tx)
# await asyncio.gather(*tasks)
async def test_2_tx(tx):
query = "RETURN 2"
result = await tx.run(query)
record = await result.value()
print("test_2_tx", record)
async def session_task(driver):
async with driver.session() as session:
await session.execute_read(test_1_tx)
async def main():
uri = "neo4j://localhost:7687"
driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)
try:
async with driver.session() as session:
await session.execute_read(test_1_tx)
except Exception as e:
print("asyncio gather within transaction failed")
print(traceback.format_exc())
# try:
# async with driver.session() as session:
# tasks = []
# for x in range(10):
# # this does not work, sessions are not safe to be used concurrently
# tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))
#
# await asyncio.gather(*tasks)
# except Exception as e:
# print("asyncio gather within session failed")
# print(traceback.format_exc())
try:
tasks = []
for x in range(10):
# this is much better, create the session in the new Task!
tasks.append(asyncio.create_task(session_task(driver)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within driver failed")
print(traceback.format_exc())
asyncio.run(main())
One closing thought (pun not intended): you don't close the driver object at the end of your script. You might want to use a context (async with AsyncGraphDatabase.driver(...) as driver:
).
All the sessions of the conference are now available online