Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
01-22-2020 10:00 PM
I'm trying out kafka + neo4j and I'm just at the quickstart step trying to get things to work. I have the plugin installed, I have kafka running producing and consuming topics. When Neo4j is consuming kafka I'm getting this in my logs:
2020-01-23 05:48:31.357+0000 INFO ======== Neo4j 3.5.14 ========
2020-01-23 05:48:31.367+0000 INFO Starting...
2020-01-23 05:48:33.128+0000 INFO Initiating metrics...
2020-01-23 05:48:34.180+0000 INFO Initialising the Streams Source module
2020-01-23 05:48:34.223+0000 INFO Initializing Kafka Connector
2020-01-23 05:48:35.678+0000 INFO Kafka Connector started
2020-01-23 05:48:35.682+0000 INFO Streams Source module initialised
2020-01-23 05:48:36.045+0000 INFO Initialising the Streams Sink module
2020-01-23 05:48:37.558+0000 INFO Starting the Kafka Sink
2020-01-23 05:48:37.844+0000 INFO Creating Sink daemon Job
2020-01-23 05:48:37.873+0000 INFO Subscribed topics: [first-topic]
2020-01-23 05:48:37.874+0000 INFO Kafka Sink started
2020-01-23 05:48:45.539+0000 INFO Sending metrics to CSV file at C:\Users\miker\.Neo4jDesktop\neo4jDatabases\database-8c2cb7bc-a2cf-4264-926b-eeb11df003a1\installation-3.5.14\metrics
2020-01-23 05:48:47.098+0000 INFO Bolt enabled on 0.0.0.0:7687.
2020-01-23 05:48:48.000+0000 INFO Started.
2020-01-23 05:48:48.132+0000 INFO Mounted REST API at: /db/manage
2020-01-23 05:48:48.223+0000 INFO Server thread metrics have been registered successfully
2020-01-23 05:48:49.142+0000 INFO Remote interface available at http://localhost:7474/
2020-01-23 05:49:11.052+0000 ERROR Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer) Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
at streams.service.errors.KafkaErrorService.report(KafkaErrorService.kt:37)
at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95)
at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:85)
at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:132)
at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:95)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
at kotlinx.coroutines.AbstractContinuation.run(AbstractContinuation.kt:19)
at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)
Some error when trying to parse the event object. I'm running kafka v2.4.0 just from the command line typing simple strings such as hello world
. This is my neo4j.config
settings:
kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
streams.sink.enabled=true
streams.sink.topic.cypher.first-topic=CREATE (n:Label) SET n.data = event
Any pointers where to go looking for the cause of the error?
Solved! Go to Solution.
01-24-2020 04:01 AM
Hi @mike.r.black,
your Sink (consumer) configuration is correct.
Neo4j Streams plugin supports two types of deserializers:
Said so, what you have to do to make your Sink works is just send the message in JSON format. For example:
{"message": "Hello world!"}
Please checkout the Consumer section for further details.
Hope this helps!
Regards,
Mauro
01-24-2020 04:01 AM
Hi @mike.r.black,
your Sink (consumer) configuration is correct.
Neo4j Streams plugin supports two types of deserializers:
Said so, what you have to do to make your Sink works is just send the message in JSON format. For example:
{"message": "Hello world!"}
Please checkout the Consumer section for further details.
Hope this helps!
Regards,
Mauro
01-24-2020 07:22 PM
Success! Thanks for the help, I didn't realize how strictly the messages had to be,
All the sessions of the conference are now available online