cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Using Spark Structured Streaming API

Hey guys

We are trying to use the spark structured streaming API, but always get the following error:

java.lang.UnsupportedOperationException: Data source org.neo4j.spark.DataSource does not support streamed writing at org.apache.spark.sql.errors.QueryExecutionErrors$.streamedOperatorUnsupportedByDataSourceError(QueryExecutionErrors.scala:430)

Which seems strange as structure streaming should be supported by the connector version 4.1. For testing purposes, we used the following setup in an interactive spark scala shell.

spark-shell --packages org.mongodb.spark:mongo-spark-connector:10.0.2,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3

Which should be the correct version of the spark connector right? Then we create a df by readStream from a MongoDB instance.

val dfTxn = spark.readStream.format("mongodb").option("spark.mongodb.connection.uri", "mongodb://<ip>:<port>").option("spark.mongodb.database", "test").option("spark.mongodb.collection", "txn").option("park.mongodb.read.readPreference.name", "primaryPreferred").option("spark.mongodb.change.stream.publish.full.document.only", "true").option("forceDeleteTempCheckpointLocation", "true").load()

Afterwards, we are trying to write it to Neo4j, which is the step where the error described above happens.

val query = dfPaymentTx.writeStream.format("org.neo4j.spark.DataSource").option("url", "bolt://<ip>:<port>").option("save.mode", "Append").option("checkpointLocation", "/tmp/checkpoint/myCheckPoint").option("labels", "Account").option("node.keys", "txn_snd").start()

We really do not understand why we get the Data Source does not support streamed writing error. We are running on an Ubuntu machine and installed Neo4j from here. Is there a problem in using Scala or anything else?

Any help is appreciated

 

1 REPLY 1

Can you please post the result of the following code?

val list = new Neo4jTable(StructType(Array(StructField("foo", DataTypes.StringType))),
Map("url" -> "bolt://neo4j:7687", "labels" -> "foo").asJava, "").capabilities()
println(list)
Nodes 2022
Nodes
NODES 2022, Neo4j Online Education Summit

All the sessions of the conference are now available online