cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

NEO4J Sink and Source connector

Does anyone have a full tutorial on neo4j - kafka sink and source stream. I'm able to load the sink and source connector but I'm not sure how to stream the data kafka-topic or read from kafka-topic to neo4j. I'm using a docker for this task.

In this line, I have a topic test_topic in kafka, with id and properties. Can I just use a simple docker exec broker kafka-console-producer --bootstrap-server broker:9093 --topic test_topic to stream the data to the topic and consume by the neo4j? What kind of arrangement of data I need to produce this to the kafka topic?
NEO4J_streams_sink_topic_cypher_test_topic: "
# MERGE (n:Person{id: event.id}) ON CREATE SET n += event.properties"


version: '2'
services:
  neo4j:
    image: neo4j:3.4-enterprise
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    volumes:
      - ./neo4j/plugins:/plugins
      - ./neo4j/data:/data
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'
      NEO4J_dbms_logs_debug_level: DEBUG
      #NEO4J_streams_procedures_enabled: "true"
      #NEO4J_streams_source_enabled: "false"
     # NEO4J_streams_sink_enabled: "true"
      #NEO4J_streams_sink_topic_cypher_test_topic: "
       # MERGE (n:Person{id: event.id}) ON CREATE SET n += event.properties"

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry:5.5.1
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
    - zookeeper
    - broker
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command: 
      - bash 
      - -c 
      - |
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
        /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    - connect
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

0 REPLIES 0