cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Cluster nodes throwing java.lang.NullPointerException when importing large number of rows

I am trying to run a query to insert record(around 1.15bn rows) s from a dataFrame.

This is my logic, using this approach from an earlier issue here.

def splitDf(df: DataFrame, n:Int): Array[Dataset[Row]] = {
  val weights = Array.fill(n)(1.0)
  df.randomSplit(weights = weights)
}

def writeCypherWithDataFrame(sc: SparkContext,
                             dataFrame: DataFrame,
                             query : String,
                             partitions: Int = 100): Unit =
{
  val config = Neo4jConfig(sc.getConf)
  val columns = dataFrame.columns.toSeq
  dataFrame.repartition(partitions).foreachPartition(rows => {
    val params: AnyRef = rows.map(r =>
      Map
      (
        "properties" -> columns.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
      ).asJava).asJava
    Neo4jDataFrame.execute(config, query, Map("rows" -> params).asJava, write = true)
  })
}

val createNodeClusters = f""" 
CALL apoc.periodic.iterate(
 "UNWIND {rows} as row
  RETURN row.properties as r", "${queryString}"
  , {batchSize:10000,iterateList:true, retries:2, params: {rows: {rows}}})
"""

// Running Cypher query using neo4j Driver

val parallel = 80
val dfCount = 115551421
val MAX_RECORDS_PER_NEO4J_CALL = 60000
val MAX_RECORDS_PER_DATASET = parallel * MAX_RECORDS_PER_NEO4J_CALL
val NUM_OF_DATASETS = 1 + ( dfCount / MAX_RECORDS_PER_DATASET).toInt

var splitDfs = splitDf(factDf, NUM_OF_DATASETS )

splitDfs.foreach {
      dataSet => writeCypherWithDataFrame(sc, dataSet, createNodeClusters, parallel)
}

Spark Logs:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 626.0 failed 4 times, most recent failure: Lost task 22.3 in stage 626.0 (TID 30628, 10.139.64.7, executor 43): org.neo4j.driver.v1.exceptions.ServiceUnavailableException: Could not perform discovery. No routing servers available.

Logs from the cluster node :
WARN Error during iterate.commit
WARN 1 times: java.lang.NullPointerException

Using this approach, jobs are passing initially, but the logs show nodes going down with this error, but the LEADER keeps changing, so it works for some time and eventually terminates after many stages are cancelled due to the above error.

Stuck with this behaviour for long. Not sure how to take it from here.
All my connection pool settings are default.

0 REPLIES 0