cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Procedure with mode WRITE does not truly stream in practice ?!

The @Procedure annotated method with return value of type Stream will allow it to be used streaming. This is functioning as expected as long as the mode=READ. When deploying as an extension, a client can obtain some of the results before all of them are actually computed in the database. When the mode=WRITE however, no results are delivered to the client until all of them are gathered on the server.

Is this as designed? And if so: how can we obtain some of the results before all of them are available serverside?

3 REPLIES 3

It should also stream the same way.

Depends on how your procedure works internally if there is something blocking?

Can you share more details?

I have prepared a small project. Seems I cannot upload the zip I prepared. The essence is:

public class TestPlugin {

	public class Result {
        public Long count;
        public String s = String.join("", Collections.nCopies(100, UUID.randomUUID().toString()));
	}

	public class TestIterator implements Iterator<Result> {
		private long count;
		public TestIterator(long count) {
			this.count = count;
		}

		@Override
		
		public synchronized boolean hasNext() {
			return (count > 0);
		}

		@Override
		public synchronized Result next() {
			count--;
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
			}
			System.out.println(count + " count");
			Result result = new Result();
			result.count = count;
			return result;
		}
	}
 
    @Procedure( name = "test.testIteratorReadMode", mode = READ )
    public Stream<Result> testIteratorReadMode()
    {
    	Iterator<Result> it = new TestIterator(100);
    	Iterable<Result> iterable = () -> it;
    	return StreamSupport.stream(iterable.spliterator(), false);
    }
    @Procedure( name = "test.testIteratorWriteMode", mode = WRITE )
    public Stream<Result> testIteratorWriteMode()
    {
    	Iterator<Result> it = new TestIterator(100);
    	Iterable<Result> iterable = () -> it;
    	return StreamSupport.stream(iterable.spliterator(), false);
    }
}

With a unit tests using the neo4j harness:

public class TestPluginTest
{
	// This rule starts a Neo4j instance
    @Rule
    public Neo4jRule neo4j = new Neo4jRule()

            // This is the Procedure we want to test
            .withProcedure( TestPlugin.class );

    @Test
	public void testIteratorReadMode() {
		try (Driver driver = GraphDatabase.driver(neo4j.boltURI(), Config.build().withoutEncryption().toConfig())) {
			Session session = driver.session();
			StatementResult result = session.run("CALL test.testIteratorReadMode()");
			result.stream().forEach(e -> System.out.println(e));
		}
	}
    @Test
   	public void testIteratorWriteMode() {
   		try (Driver driver = GraphDatabase.driver(neo4j.boltURI(), Config.build().withoutEncryption().toConfig())) {
   			Session session = driver.session();
   			StatementResult result = session.run("CALL test.testIteratorWriteMode()");
   			result.stream().forEach(e -> System.out.println(e));
   		}
   	}
}

The first test shows the server ## count lines and the records received by the test code.
It begins with

99 count
98 count
97 count
Record<{count: 99, s: "23deb248-2e56-46ae-ad85-7c8f84751ef423de..
Record<{count: 98, s: "be34527e-886f-48e2-9696-e72861e5055abe34527e..
Record<{count: 97, s: "65f7c83f-51d0-4a8a-b514-bd59fc5559fd65f..
96 count
95 count
94 count
Record<{count: 96, s: "41c5d01e-1855-4b09-9f53-14643436a3a041c5..
Record<{count: 95, s: "b4d4071f-1170-4109-bf2d-4acf6e6081a4b4d40..

This shows that the results arrive at the test almost immediately. There is some delay, probably because of I/O buffering?

The second test shows that first all 100 Results are created, before they arrive:

99 count
98 count
..
0 count
Record<{count: 99, s: "ec735aa4-cdd6-4890-adc8-984a41c5b42eec7..
Record<{count: 98, s: "2f6b08f9-b05d-4a90-8a93-a88587cebb172f6b0..
..
Record<{count: 0, s: "ef4d19a2-e4d5-40ac-9961-30d7a26edb08ef4d19a2-..

Which version of Neo4j and the drivers were you using?
I think its might be more of a driver issue.