Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
08-26-2018 03:24 PM
I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".
Two solutions are available to make neo4j graphs with spark :
Cypher for Apache Spark (CAPS)
Neo4j-Spark-Connector
I used the first one ,CAPS .
The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row
what i want is to make a connected graph of geohash nodes.
CAPS allow only to make a graph by mapping nodes :
If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.
A very simple layout would be:
Nodes: (just id, no properties)
id
0
1
2
Relationships: (just the mandatory fields)
id | start | end
0 | 0 | 1
1 | 0 | 2
based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :
Pickup dataframe
Drop off data-frame and
Trip data frame
I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame
and this was the result :
The problem that i got is:
the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes
a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one
i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.
So is there a way to make a connected ,oriented and merged geohash graph with spark ?
This is my code :
package org.opencypher.spark.examples
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName
object GreenCabsInputDataFrames extends ConsoleApp {
//1) Create CAPS session and retrieve Spark session
implicit val session: CAPSSession = CAPSSession.local()
val spark = session.sparkSession
//2) Load a csv into dataframe
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")
//3) cache the dataframe
val df1=df.cache()
//4) subset the dataframe
val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
val dropoff_dataframe=df1.select("_c22","_c23")
//5) uncache the dataframe
df1.unpersist()
//6) add id columns to pickup , dropoff and trip dataframes
val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
//7) create the relationship "trip" is dataframe
val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")
//drop unnecessary columns
val pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")
//8) reordering the columns of trip dataframe
val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")
//8.1)displaying dataframes in console
pickup_dataframe3.show()
dropoff_dataframe2.show()
trip_data_dataframe4.show()
//9) mapping the columns
val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
//10) create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)
//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)
//12) Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)
//13) Store graph in neo4j
val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
neo4jResult.store(neo4jResultName, graph)
}
08-26-2018 06:29 PM
CAPS stores nodes in a table for each label. To merge pickups and drop-offs, you should have a third "location" table. If you have location information for pickups and drop-offs, this then becomes another node table with a relationship table that you can compute. So in the end you want to have:
pickup (id)
-------
1
2
dropoff (id)
----------
3
4
location (id, name)
------
5,"1st St"
6,"2nd St"
pickup_location (id, pickup_id, location_id)
7,1,5
dropoff_location (id, dropoff_id, location_id)
8,3,6
At this point with CAPS only, I would probably not try to "merge" the pickups and drop-offs, but I would link them to the appropriate location, so that your graph would look like this:
(:Pickup)-[:HAS_LOCATION]->(:Location)<-[:HAS_LOCATION]-(:Dropoff)
08-27-2018 01:50 AM
You also said you tried it with the spark-connector.
That one should be able to MERGE if you chose the geo-hash-field as ID field.
So you would only have one Node type :Location
with the geohash as ID-field.
And encode the type as the relationship from :Trip
.
(:Location)<-[:PICKUP]-(:Trip)-[:DROPOFF]->(:Location)
HTH
03-25-2019 05:56 AM
Which approach could be the best to work with Spark between CAPS and Neo4j-Spark-Connector?
Currently, I have not seen any complete example (i.e. described as a tutorial) to work with Spark. Thanks in advance.
All the sessions of the conference are now available online