Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
11-15-2019 06:12 AM
Hy guys,
I'm running a docker container with multiple Neo4j instances and Kafka Connect.
In one Neo4j instance I want to to ingest CDC events coming from another Neo4j Instance with a Neo4j Sink Connector.
I want to use the schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined into the graph model.
But the events sink in with the sourceId strategy and get only the default label (:SourceEvent
), other labels get discarded.
I setup the schema strategy with this rest-api call to Kafka connect
(since i want to ingest cdc events from the kg-cdc topic, I added the "neo4j.topic.cdc.schema":"kg-cdc"
😞
{
"name": "sink-cdc",
"config": {
"topics": "kg-cdc",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "error-kg2app",
"errors.deadletterqueue.topic.replication.factor":"1",
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neoapp:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pass",
"neo4j.encryption.enabled": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"neo4j.topic.cdc.schema":"kg-cdc"
}
}
and get the next Neo4jSinkConnectorConfig values in Kafka connect:
INFO Neo4jSinkConnectorConfig values:
neo4j.authentication.basic.username = neo4j
neo4j.authentication.basic.username = neo4j
neo4j.authentication.kerberos.ticket = [hidden]
neo4j.authentication.type = BASIC
neo4j.batch.size = 1000
neo4j.batch.timeout.msecs = 30000
neo4j.connection.acquisition.timeout.msecs = 60000
neo4j.connection.liveness.check.timeout.msecs = 60000
neo4j.connection.max.lifetime.msecs = 3600000
neo4j.connection.max.pool.size = 100
neo4j.encryption.ca.certificate.path =
neo4j.encryption.enabled = false
neo4j.encryption.trust.strategy = TRUST_ALL_CERTIFICATES
neo4j.load.balance.strategy = LEAST_CONNECTED
neo4j.retry.backoff.msecs = 30000
neo4j.retry.max.attemps = 5
neo4j.server.uri = bolt://neoapp:7687
neo4j.topic.cdc.schema = kg-cdc
neo4j.topic.cdc.sourceId =
neo4j.topic.cdc.sourceId.idName = sourceId
neo4j.topic.cdc.sourceId.labelName = SourceEvent
(streams.kafka.connect.sink.Neo4jSinkConnectorConfig)
But on the other hand when I use the neo4j streams plugin for neo, the schema strategy works.
Does anybody know what I might have done wrong or has anyone run into the same problem?
Thank you in advance
Solved! Go to Solution.
11-27-2019 07:06 AM
11-18-2019 09:06 AM
Hi LampicJ15,
here you will find a working docker-compose file which prepare an environment with:
docker-compose.yml
version: '3'
services:
neo4j-source:
image: neo4j:3.5
hostname: neo4j-source
container_name: neo4j-source
depends_on:
- zookeeper
- broker
ports:
- "8474:7474"
- "8687:7687"
volumes:
- ./neo4j/plugins:/plugins
environment:
NEO4J_kafka_zookeeper_connect: zookeeper:2181
NEO4J_kafka_bootstrap_servers: broker:9093
NEO4J_AUTH: neo4j/source
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_dbms_logs_debug_level: DEBUG
NEO4J_streams_source_topic_nodes_neo4jnodes: Movie{*};Person{*}
NEO4J_streams_source_topic_relationships_neo4jrelationships: DIRECTED{*};ACTED_IN{*}
NEO4J_streams_source_schema_polling_interval: 10000
neo4j-sink:
image: neo4j:3.5
hostname: neo4j-sink
container_name: neo4j-sink
depends_on:
- neo4j-source
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/sink
NEO4J_dbms_memory_heap_max__size: 2G
NEO4J_dbms_logs_debug_level: DEBUG
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
connect:
image: confluentinc/cp-kafka-connect
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
command:
- bash
- -c
- |
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.2
/etc/confluent/docker/run
and a JSON which configure a Kafka SINK connector:
contrib.sink.string-json.neo4j.json
{
"name": "Neo4jSinkConnectorJSON",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"topics": "neo4jnodes,neo4jrelationships",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j-sink:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "sink",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "neo4jnodes;neo4jrelationships"
}
}
In this example configuration i've used the Movie dataset.
Let's follow theese steps:
docker-compose up -d
curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @contrib.sink.string-json.neo4j.json
localhost:8474
(which is the Neo4j source instance) from your browser and login with the credentials provided into the docker-compose fileCREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Movie) ASSERT p.title IS UNIQUE;
:play movie-graph
and following the second step of the wizardlocalhost:7474
) you can see that nodes and relationships created into the Neo4j source instance have been replicatedLet me know if this helps.
Regards,
Mauro
11-19-2019 02:19 AM
For completeness of information, the thing that makes it works is the property NEO4J_streams_source_schema_polling_interval
. In this case it's value is 10000 ms, this means that schema changes will be polled every 10 seconds or in other words how quickly the database picks up new indexes/schema changes.
Please note that if you don't specify a custom value, the default is 300000 milliseconds, that means you have to wait 5 minutes before the Streams plugin polls the DB in order to retrieve schema changes and store them.
At the following links you will find some documentation about this property:
11-21-2019 06:17 AM
Hey @mauro.roiter thank you for the quick reply.
I have tried your example with the movie dataset and the belonging docker-compose.yml but it still does not work.
But the interesting part was when I modified your contrib.sink.string-json.neo4j.json to work for my example, the nodes and the relationships were replicated but again with the sourceId
strategy insted of the schema
strategy.
11-22-2019 12:24 AM
Hi @LampicJ15,
could you please share your CDC configuration? Which versions are you using for Neo4j Streams plugin and Kafka Connect Sink plugin?
11-22-2019 01:01 AM
Hi @mauro.roiter,
these are the neo4j configurations for the source instance
(where I am using neo4j-streams-3.5.4 plugin)
dbms.security.procedures.unrestricted=algo.*,apoc.*,sc.*
dbms.security.procedures.whitelist=algo.*,apoc.*,hekovnik.*
apoc.trigger.enabled=true
#neo4j streams CDC source config
kafka.zookeeper.connect=zk:2181
kafka.bootstrap.servers=kafka:9092
streams.source.topic.nodes.kg-cdc=Movie{*};Person{*}
streams.source.topic.relationships.kg-cdc=DIRECTED{*};ACTED_IN{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000
streams.procedures.enabled=true
As for the kafka connect sink plugin I am using kafka-connect-neo4j:1.0.2
11-23-2019 05:13 AM
Hi @LampicJ15,
there is a bug in the kafka-connect-sink about the ingestion strategy resolution:
I created this issue to track it:
I also worked to a fix can you please try to download this package:
And replace the connect
service provided by Mauro with this one:
connect:
image: confluentinc/cp-kafka-connect
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
please consider that you must place the unzipped downloaded package into the plugin
directory
placed in the same docker-compose.yml
file.
I look forward to your feedback.
Thanks a lot!
Andrea
11-25-2019 08:14 AM
Hey @conker84 thank you for the quick response, but the sink connector with the schema ingestion strategy is still not working properly. It sinks nodes but not relationships, I found the bug in SchemaIngestionStrategy.kt
in the function prepareRelationshipEvents
when getting startNodeConstraints and endNodeConstraints, because the function getNodeConstraints
always returns an empty list. That happens because there are no node constraints defined in the cdc format for relationships and also the function getNodeConstraints
does not handle relationship cdc records.
11-27-2019 02:07 AM
@LampicJ15 can you provide a simple dataset with related constrains that is not correctly replicated?
11-27-2019 05:19 AM
A simple example is with the movie dataset:
In the source Neo4j I set these constraints:
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Movie) ASSERT p.title IS UNIQUE;
And then load the movie graph data set :play movie-graph
(second step).
After that I create a sink connector which only sinks in the nodes.
So I get all of the 171 nodes but no relationships.
I suspect that the problem is with the getNodeConstraints
function:
private fun getNodeConstraints(event: StreamsTransactionEvent,
filter: (Constraint) -> Boolean): List<Constraint> = event.schema.constraints.filter { filter(it) }
which tries to get node constraints from the relationship payload. But there are no constraints defined in the relationship payload. The CDC record for the relationship in this example is the following:
{"meta": {"timestamp":1574859738518,"username":"neo4j","txId":34,"txEventId":366,"txEventsCount":424,"operation":"created","source":{"hostname":"neo"}}, "payload": {"id":"33", "start":{"id":"23","labels":["Person"],"ids":{"name":"Kevin Pollak"}}, "end":{"id":"15","labels":["Movie"],"ids":{"title":"A Few Good Men"}}, "before":null, "after": {"properties":{"roles":["Lt. Sam Weinberg"]}},"label":"ACTED_IN","type":"relationship"}, "schema":{"properties":{"roles":"String[]"}, "constraints":[]}}
As you can see the field at the end schema.cosntraints is empty.
11-27-2019 07:05 AM
Oh I see, we already fixed that:
It's in the master branch but we didn't release it yet, I complied a version that contains the fix for you:
I look forward to your feedback.
11-27-2019 07:06 AM
wrong link please use this:
11-29-2019 04:07 AM
@conker84 thank you for the quick response, the source and sink now work properly.
All the sessions of the conference are now available online