Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
11-14-2019 10:35 AM
I am trying to run a query to insert record(around 1.15bn rows) s from a dataFrame.
This is my logic, using this approach from an earlier issue here.
def splitDf(df: DataFrame, n:Int): Array[Dataset[Row]] = {
val weights = Array.fill(n)(1.0)
df.randomSplit(weights = weights)
}
def writeCypherWithDataFrame(sc: SparkContext,
dataFrame: DataFrame,
query : String,
partitions: Int = 100): Unit =
{
val config = Neo4jConfig(sc.getConf)
val columns = dataFrame.columns.toSeq
dataFrame.repartition(partitions).foreachPartition(rows => {
val params: AnyRef = rows.map(r =>
Map
(
"properties" -> columns.map( c => (c, r.getAs[AnyRef](c))).toMap.asJava
).asJava).asJava
Neo4jDataFrame.execute(config, query, Map("rows" -> params).asJava, write = true)
})
}
val createNodeClusters = f"""
CALL apoc.periodic.iterate(
"UNWIND {rows} as row
RETURN row.properties as r", "${queryString}"
, {batchSize:10000,iterateList:true, retries:2, params: {rows: {rows}}})
"""
// Running Cypher query using neo4j Driver
val parallel = 80
val dfCount = 115551421
val MAX_RECORDS_PER_NEO4J_CALL = 60000
val MAX_RECORDS_PER_DATASET = parallel * MAX_RECORDS_PER_NEO4J_CALL
val NUM_OF_DATASETS = 1 + ( dfCount / MAX_RECORDS_PER_DATASET).toInt
var splitDfs = splitDf(factDf, NUM_OF_DATASETS )
splitDfs.foreach {
dataSet => writeCypherWithDataFrame(sc, dataSet, createNodeClusters, parallel)
}
Spark Logs:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 626.0 failed 4 times, most recent failure: Lost task 22.3 in stage 626.0 (TID 30628, 10.139.64.7, executor 43): org.neo4j.driver.v1.exceptions.ServiceUnavailableException: Could not perform discovery. No routing servers available.
Logs from the cluster node :
WARN Error during iterate.commit
WARN 1 times: java.lang.NullPointerException
Using this approach, jobs are passing initially, but the logs show nodes going down with this error, but the LEADER keeps changing, so it works for some time and eventually terminates after many stages are cancelled due to the above error.
Stuck with this behaviour for long. Not sure how to take it from here.
All my connection pool settings are default.
All the sessions of the conference are now available online