Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
03-05-2020 03:02 AM
We are using neo4j 3.5.4 version.
We have nearly 200 nodes and relationships. We want to capture all nodes and relationships data even for all CURD operations through Kafka .
In neo4j .conf how I need to configure for all nodes and relationships for topic?
streams.source.topic.nodes.=
streams.source.topic.relationships.=
Can you please give provide your inputs?
Thanks
02-08-2021 10:09 AM
Hi @hunter ,
just a little premise: if you are still using Neo4j 3.5, i recommend to use the latest available Neo4j Streams plugin version for the 3.5 (it is the 3.5.12: Release Neo4j Streams 3.5.12 · neo4j-contrib/neo4j-streams · GitHub).
Another premise i would like to do is that with the Neo4j Streams CDC module you can capture only the create/update/delete operations (not the read operations).
That said, if you want to capture all the created/updated/deleted nodes and relationships, you have to simply enable the CDC module:
streams.source.enabled=true
Doing so, the plugin will create a default topic named "neo4j" which will contain all the create/update/delete events.
Hope this helps.
Mauro
06-10-2021 08:08 PM
Hi @mroiter-larus ,
I have used the neo4j streams 3.5.12, but I got the error ' Timed out waiting for a node assignment.'
kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
streams.source.enabled=true
streams.source.topic.nodes.person=Person{name}
streams.sink.enabled=true
streams.sink.topic.cypher.person=CYPHER-QUERY
11-04-2021 02:59 AM
I always receive the same error message when starting the neo4j kafka connector example. Does anybody know a fix for this?
01-26-2022 09:10 AM
Hi @klug,
which is the example are you referring to? Could you please share the connector json configuration?
Which version of the connector are you using?
Mauro
01-26-2022 10:11 PM
Hi! I was using the configuration from the documentation:
---
version: "2"
services:
neo4j:
image: neo4j:4.1.9
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
schema_registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: zookeeper:2181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9093
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
command:
# - bash
# - -c
# - |
# confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.6
/etc/confluent/docker/run
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
01-27-2022 02:12 AM
Hi @klug,
it seems that in the documentation there are some inconsistencies. I'm going to fix them asap.
In the meantime, you can find here a working docker-compose file and a step-by-step guide to quickstart with the Neo4j Kafka Connector.
docker-compose.yml
and the sink_config.json
files and place them into your desired folder. Inside this folder create also a plugins
folderplugins
folder created in the previous stepdocker-compose up -d
curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @sink_config.json
. If the installation process completes successfully you should see the json file's content as output. However, you can also verify the connect
container logs.docker exec -it broker /bin/bash
kafka-console-producer
on the topic my-topic
: kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
{"name": "Foo", "surname": "Bar"}
Here is the docker-compose.yml
:
---
version: "2"
services:
neo4j:
image: neo4j:4.1.9
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9093
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
and the sink_config.json
:
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "my-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "connect",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
}
}
Hope this helps.
Mauro
02-10-2022 08:04 AM
02-11-2022 01:20 AM
Hi @mroiter-larus,
the docker-compose works fine, thank you so much for this!
I am currently trying to get step 6 and 7 to run and I need to research the commands for that.
Best regards
Fabi
02-11-2022 07:43 AM
Hi @klug ,
about the step 6, i've already provide the command. You just need to connect to the broker
container and run that command. It will open a shell where you just need to copy and paste the event provided into step 7 and then press enter to send it to the topic.
Hope this helps.
Mauro
02-23-2022 03:12 AM
Hi @mroiter-larus.
Okay I will check if it works in my case. The last time I tried I got error messages when I ran "docker exec -it broker /bin/bash" but I solved them. In the next step the command is my-topic
: kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
?
Do you also have a source configuration for neo4j?
Best regards
Fabi
02-23-2022 09:20 AM
Hi @klug,
the command to create a console producer is just:
kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
about the source configuration you can take the following as an example:
{
"name": "Neo4jSourceConnector",
"config": {
"topic": "topic-source",
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.database": "testdb",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"neo4j.streaming.poll.interval.msecs": 5000,
"neo4j.streaming.property": "timestamp",
"neo4j.streaming.from": "NOW",
"neo4j.enforce.schema": false,
"neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
}
}
Please refer to the official documentation for further details on the source connector.
Regards,
Mauro
03-08-2022 04:22 AM
Hi @mroiter-larus,
Thank you for your help, I was able to send the event over the console to kafka, that is a good step for me.
Now I need to automate/trigger events from the database to kafka. I will take a closer look to the documentation.
Thanks again!
Best regards
Fabi
01-26-2022 09:06 AM
Hi @tinleo,
It seems that Neo4j can't connect to Kafka. Are you sure the zookeeper url and the bootstrap server url are correct? Are you sure Zookeeper and Kafka are correctly up and running?
Mauro
All the sessions of the conference are now available online