cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Neo4j kafka sink not initializing/working when trying to stream data

Trying to configure a neo4j kafka sink to consume json data from a microservice. Currently after restarting local neo4j client, I do not see any debug messages that indicate the kafka neo4j streams loading. I have provided my neo4j conf and debug trace files. I am pretty new to neo4j so please let me know if I have made any mistakes on either the configuration or neo4j side of things.
`
If it helps I am following this guide to set everything up, unfortunately my company uses 4.2.1 so couldn't use the docker compose to show use case.
Streaming Graphs: Combining Kafka and Neo4j

`
This is my docker of confluent platforms kafka docker compose. This is the docker-compose ps command. I can access everything from localhost:9092 for kafka(tested using kafka-console-consumer --topic friends --bootstrap-server localhost:9092 --from-beginning and it works after producing documents to the topic)

     Name                    Command               State                       Ports                     
---------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp, 9092/tcp              
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp                        
ksql-datagen      bash -c echo Waiting for K ...   Up                                                    
ksqldb-cli        /bin/sh                          Up                                                    
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                        
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp                        
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                        
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp    

`
I am using neo4j 4.2.1 with Streams 4.0.7 versions

kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

streams.sink.topic.cypher.friends=MERGE (p1:Person { name: event.initiated }) MERGE (p2:Person { name: event.accepted }) CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
streams.sink.enabled=true

`
This is my configuration in debug trace

2021-05-14 07:43:51.278+0000 INFO  Starting...
2021-05-14 07:43:54.047+0000 INFO  ======== Neo4j 4.2.1 ========
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.
1 ACCEPTED SOLUTION

Found the solution myself. Basically, you will need to create a new streams.conf file and add the connection configurations in that conf file, or else it will not start the kafka neo4j streams connection.

View solution in original post

1 REPLY 1

Found the solution myself. Basically, you will need to create a new streams.conf file and add the connection configurations in that conf file, or else it will not start the kafka neo4j streams connection.