Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
06-25-2022 09:14 AM
Hey guys
We are trying to use the spark structured streaming API, but always get the following error:
java.lang.UnsupportedOperationException: Data source org.neo4j.spark.DataSource does not support streamed writing at org.apache.spark.sql.errors.QueryExecutionErrors$.streamedOperatorUnsupportedByDataSourceError(QueryExecutionErrors.scala:430)
Which seems strange as structure streaming should be supported by the connector version 4.1. For testing purposes, we used the following setup in an interactive spark scala shell.
spark-shell --packages org.mongodb.spark:mongo-spark-connector:10.0.2,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3
Which should be the correct version of the spark connector right? Then we create a df by readStream from a MongoDB instance.
val dfTxn = spark.readStream.format("mongodb").option("spark.mongodb.connection.uri", "mongodb://<ip>:<port>").option("spark.mongodb.database", "test").option("spark.mongodb.collection", "txn").option("park.mongodb.read.readPreference.name", "primaryPreferred").option("spark.mongodb.change.stream.publish.full.document.only", "true").option("forceDeleteTempCheckpointLocation", "true").load()
Afterwards, we are trying to write it to Neo4j, which is the step where the error described above happens.
val query = dfPaymentTx.writeStream.format("org.neo4j.spark.DataSource").option("url", "bolt://<ip>:<port>").option("save.mode", "Append").option("checkpointLocation", "/tmp/checkpoint/myCheckPoint").option("labels", "Account").option("node.keys", "txn_snd").start()
We really do not understand why we get the Data Source does not support streamed writing error. We are running on an Ubuntu machine and installed Neo4j from here. Is there a problem in using Scala or anything else?
Any help is appreciated
06-28-2022 07:22 AM
Can you please post the result of the following code?
val list = new Neo4jTable(StructType(Array(StructField("foo", DataTypes.StringType))),
Map("url" -> "bolt://neo4j:7687", "labels" -> "foo").asJava, "").capabilities()
println(list)
All the sessions of the conference are now available online