cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Unable to write relationships using neo4j spark connector with spark 3.1.1

I am following the documentation to write relationships to aa neo4j db. I am able to write nodes but for relationships I get the following stack trace:

21/04/13 13:59:23 WARN SchemaService: Switching to query schema resolution
21/04/13 13:59:23 WARN SchemaService: For the following exception
org.neo4j.driver.exceptions.ClientException: Unable to convert scala.collection.immutable.Map$Map1 to Neo4j Value.
	at org.neo4j.driver.Values.value(Values.java:134)
	at org.neo4j.driver.internal.util.Extract.mapOfValues(Extract.java:203)
	at org.neo4j.driver.internal.AbstractQueryRunner.parameters(AbstractQueryRunner.java:69)
	at org.neo4j.driver.internal.AbstractQueryRunner.run(AbstractQueryRunner.java:43)
	at org.neo4j.spark.service.SchemaService.retrieveSchemaFromApoc(SchemaService.scala:68)
	at org.neo4j.spark.service.SchemaService.liftedTree2$1(SchemaService.scala:171)
	at org.neo4j.spark.service.SchemaService.structForRelationship(SchemaService.scala:155)
	at org.neo4j.spark.service.SchemaService.struct(SchemaService.scala:262)
	at org.neo4j.spark.DataSource.$anonfun$inferSchema$1(DataSource.scala:41)
	at org.neo4j.spark.DataSource.callSchemaService(DataSource.scala:29)
	at org.neo4j.spark.DataSource.inferSchema(DataSource.scala:41)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
	at org.apache.spark.sql.DataFrameWriter.getTable$1(DataFrameWriter.scala:339)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:401)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at AdeptxPipeline$.writeRel(AdeptxPipeline.scala:146)
	at AdeptxPipeline$.delayedEndpoint$AdeptxPipeline$1(AdeptxPipeline.scala:94)
	at AdeptxPipeline$delayedInit$body.apply(AdeptxPipeline.scala:5)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at AdeptxPipeline$.main(AdeptxPipeline.scala:5)
	at AdeptxPipeline.main(AdeptxPipeline.scala)
21/04/13 13:59:23 INFO Driver: Closing driver instance 1795925655
21/04/13 13:59:23 INFO ConnectionPool: Closing connection pool towards nlp.raiders-dev.awscloud.abbvienet.com:8092
Exception in thread "main" org.apache.spark.sql.AnalysisException: TableProvider implementation org.neo4j.spark.DataSource cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:402)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at AdeptxPipeline$.writeRel(AdeptxPipeline.scala:146)
	at AdeptxPipeline$.delayedEndpoint$AdeptxPipeline$1(AdeptxPipeline.scala:94)
	at AdeptxPipeline$delayedInit$body.apply(AdeptxPipeline.scala:5)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at AdeptxPipeline$.main(AdeptxPipeline.scala:5)
	at AdeptxPipeline.main(AdeptxPipeline.scala)

Here is the dataframe schema I am trying to write:

root
 |-- NCT_ID: string (nullable = true)
 |-- ID: string (nullable = true)

and my code:


  finalSiteStudyDF.write
    .format("org.neo4j.spark.DataSource")
    .option("url", NEO4JURL)
    .option("authentication.type", "basic")
    .option("batch.size", 2500)
    .option("transaction.retries", "10")
    .option("relationship.save.strategy", "keys")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "zeppelin")
    .option("relationship", "HAS_SITE")
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Study")
    .option("relationship.source.node.keys", "NCT_ID")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Site")
    .option("relationship.target.node.keys", "ID")

I have also tried using "Overwrite" as the save.mode with the same result.

The expected behavior is of course for this to write my relationship edges.

Scala version = 2.12.10
Spark version = 3.1.1
Neo4j jar connector included in spark jars and in SBT file like:
"neo4j-contrib" %% "neo4j-connector-apache-spark" % "4.0.1_for_spark_3"

Kind of stumped here, what could be the issue?

cross-posted as an issue here: Unable to write relationship · Issue #325 · neo4j-contrib/neo4j-spark-connector · GitHub

1 REPLY 1

Try this:

    .option("relationship.source.node.keys", "NCT_ID:NAME_OF_THE_PROPERTY_ON_THE_NODE")

 

    .option("relationship.target.node.keys", "ID:SAME_HERE")