Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
11-18-2020 01:12 AM
Using the neo4j version 3.5.23 , plugin version neo4j-streams-3.5.12.jar.
Trying to publish message to a KAFKA cluster with authorization and authentication, via a service account using SASL. The service account does not (cannot) have admin privileges to the cluster.
Added all the required kafka related parameters in neo4j.conf.
Getting below exception while trying to publish a message from Neo4j to KAFKA topic . (source/ streams module)
WARN Cannot retrieve valid topics because the following exception,
next attempt is in 300000 ms: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at streams.kafka.KafkaAdminService$start$1.invokeSuspend(KafkaAdminService.kt:29)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
at kotlinx.coroutines.DispatchedContinuation.run(Dispatched.kt:81)
at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
Upon looking at the neo4j-streams source code (from github), we are assuming this error is related to the listTopics(), describecluster() and other admin client related operations which the plugin tries to do against the kafka cluster.
Please confirm on the above understanding,
Also is there a workaround to publish messages to the secure KAFKA cluster with an account having privileges only for publish / consume. (no admin privileges).
Please suggest.
05-04-2021 04:39 AM
I am having the same issue. I am using Neo4j Desktop 1.4.5 with a 4.2.5 database, 4.0.7 of the plugin and Confluent Kafka all running locally on a Windows 10 laptop.
I have confirmed that I can publish to a topic via a Kafka producer console. I am trying to learn the ins and outs of publishing and ingesting data between neo4j and Kafka. I would like to use the CDC processor to just publish Creates, Updates and Deletes.
All the sessions of the conference are now available online