Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
04-20-2020 01:46 PM
Is there any way i can do replication from PostgesSQL to Neo4j just like other heterogeneous replication like Oracle ->SQL server or ORACLE -> PostgreSQL .It doesn't needs to be real time .It is ok to have some latency .
Thank you in advance .
sojan
04-20-2020 08:50 PM
A popular solution is to use Kafka as a message bus. Publish a message to Kafka and you can configure Neo4j as a subscriber (sink) to that topic to save the data in the graph. Neo4j can also publish to Kafka if you need to get data back out.
Replicating data to and from Neo4j is pretty much like any other database replication. There usually is some sort of message bus application that receives the change data capture logs and then executes the "insert" or "cypher" command against the target.
04-21-2020 06:16 AM
Thanks Mike for the reply .Do you think you can share any documents or use cases for me to understand in depth in case if you have .
Thanks
sojan
04-24-2020 06:26 AM
Here a good link to help you get started
There's lots of resources about Kafka in general on the web. The link I shared is about the Neo4j integration.
04-24-2020 06:39 AM
Thank you so much for pointing it out to the right direction @mike.r.black
Thanks
sojan
05-08-2020 06:33 AM
Hi @mike.r.black and other experts ,
I have been trying to setup replication from source ->kafka->neo4j and i am able to complete the first part .Now i am trying to bringup the sink connector for neo4j it is giving me an error .Is that because of any version mismatches?
kafka version=kafka_2.12-2.5.0
neo4j-sink-file=neo4j-kafka-connect-neo4j-1.0.7 .
I Can also see the kafak-connect recognize the neo4j plugins from log
[2020-05-06 13:55:22,819] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/doc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,821] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/doc/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,822] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,823] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,824] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/assets (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,825] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/assets/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,825] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/lib (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:23,473] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/lib/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
Now when i run the curl command it is failing
curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc/sink-neo4j.json
{
"servlet":"org.glassfish.jersey.servlet.ServletContainer-782be4eb",
"message":"Request failed.",
"url":"/connectors",
"status":"500"
}
here is the contents of the sink-neo4j.json
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "person",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "test-error-topic",
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "aaaa",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.person": "MERGE (p:Person{name: event.name, age: event.age})"
}
}
any idea what is the cause of the error ? Appreciate your time in advance .
Thanks
Sojan
05-08-2020 10:14 AM
Here is the detailed error coming from connect.log
[2020-05-08 10:55:23,887] ERROR Uncaught exception in REST call to /connectors (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.NullPointerException
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:145)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:760)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:547)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:500)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)
All the sessions of the conference are now available online