Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
09-14-2019 03:01 AM
Hi,
I have used docker compose to set up an instance of Neo4j with the streams plug in and the apoc procedures. Code is below:
neo4j-sink:
image: neo4j:3.5
hostname: neo4j-sink
container_name: neo4j-sink
# depends_on:
# - neo4j-source
ports:
- "7474:7474"
- "7687:7687"
volumes:
- ./neo4j/plugins:/plugins
environment:
NEO4J_kafka_zookeeper_connect: 0.0.0.0:2181
NEO4J_kafka_bootstrap_servers: 0.0.0.0:9092
# NEO4J_AUTH: neo4j/sink
NEO4J_dbms_memory_heap_max_size: 2G
NEO4J_kafka_max_poll_records: 10000
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_streams_sink_enabled: "true"
NEO4J_kafka_group_id: "neo4j_sink_1"
NEO4J_enable_auto_commit: "true"
NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description"
This has led to the sink initialising and listening to the topic:
neo4j-sink | 2019-09-14 09:45:54.962+0000 INFO Starting the Kafka Sink
neo4j-sink | 2019-09-14 09:45:55.473+0000 INFO Creating Sink daemon Job
neo4j-sink | 2019-09-14 09:45:55.478+0000 DEBUG Subscribed topics with Cypher queries: {ARTICLECOMMIT=WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description}
neo4j-sink | 2019-09-14 09:45:55.479+0000 DEBUG Subscribed topics with CDC configuration: {CDC_SOURCE_ID=[], CDC_SCHEMA=[]}
neo4j-sink | 2019-09-14 09:45:55.480+0000 INFO Kafka Sink started
neo4j-sink | 2019-09-14 09:45:55.480+0000 INFO Streams Sink module initialised
however from the command line producer OR when I send a message through my stream, it does not register in neo4J. I can confirm that the topic is recieving messages from my streams; I checked through the Kafka-console-consumer and listening in the stream. Please advise if there is anything I am doing wrong.
i tried this:
$ kafka-console-producer --broker-list 0.0.0.0:9092 --topic ARTICLECOMMIT
>{"ID":12345,"og_descrpition":"12312312"}
still did not work
09-14-2019 04:12 AM
Hi,
there is an error into the documentation please try with this:
NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"
09-14-2019 04:20 AM
Hi Conker84, unfortunately it is still not working!
09-14-2019 10:00 AM
Can you share the full compose file?
09-15-2019 10:24 AM
hello Conker84
version: '3'
services:
neo4j-sink:
image: neo4j:3.5
hostname: neo4j-sink
container_name: neo4j-sink
ports:
- "7474:7474"
- "7687:7687"
volumes:
- ./neo4j/plugins:/plugins
environment:
NEO4J_kafka_zookeeper_connect: 0.0.0.0:2181
NEO4J_kafka_bootstrap_servers: 0.0.0.0:9092
NEO4J_dbms_memory_heap_max_size: 2G
NEO4J_kafka_max_poll_records: 10000
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_streams_sink_enabled: "true"
NEO4J_kafka_group_id: "neo4j_sink_1"
NEO4J_enable_auto_commit: "true"
NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"
I can confirm that zookeeper is active at that port and bootstrap server is at 9092
and there are messages being published to that particular topic through a java producer
All the sessions of the conference are now available online