Spark: Generating CSV files to import into Neo4j
About a year ago Ian pointed me at a Chicago Crime data set which seemed like a good fit for Neo4j and after much procrastination I’ve finally got around to importing it.
The data set covers crimes committed from 2001 until now. It contains around 4 million crimes and meta data around those crimes such as the location, type of crime and year to name a few.
The contents of the file follow this structure:
$ head -n 10 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,"(41.729576153145636, -87.57568059471686)"
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,"(41.884543798701515, -87.72803579358926)"
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,"(41.785633535413176, -87.74148516669783)"
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,"(41.766348042591375, -87.64702037047671)"
9461140,HX113909,01/14/2014 03:17:00 AM,016XX W HUBBARD ST,0610,BURGLARY,FORCIBLE ENTRY,COMMERCIAL / BUSINESS OFFICE,false,false,1215,012,27,24,05,1165029,1903111,2014,01/16/2014 12:40:00 AM,41.889741146006095,-87.66939334853973,"(41.889741146006095, -87.66939334853973)"
9460361,HX113731,01/14/2014 03:12:00 AM,022XX S WENTWORTH AVE,0820,THEFT,$500 AND UNDER,CTA TRAIN,false,false,0914,009,25,34,06,1175363,1889525,2014,01/20/2014 12:40:05 AM,41.85223460427207,-87.63185047834335,"(41.85223460427207, -87.63185047834335)"
9461691,HX114506,01/14/2014 03:00:00 AM,087XX S COLFAX AVE,0650,BURGLARY,HOME INVASION,RESIDENCE,false,false,0423,004,7,46,05,1195052,1847362,2014,01/17/2014 12:40:17 AM,41.73607283858007,-87.56097809501115,"(41.73607283858007, -87.56097809501115)"
9461792,HX114824,01/14/2014 03:00:00 AM,012XX S CALIFORNIA BLVD,0810,THEFT,OVER $500,STREET,false,false,1023,010,28,29,06,1157929,1894034,2014,01/17/2014 12:40:17 AM,41.86498077118534,-87.69571529596696,"(41.86498077118534, -87.69571529596696)"
Since I wanted to import this into Neo4j I needed to do some massaging of the data since the neo4j-import tool expects to receive CSV files containing the nodes and relationships we want to create.
I’d been looking at Spark towards the end of last year and the pre-processing of the big initial file into smaller CSV files containing nodes and relationships seemed like a good fit.
I therefore needed to create a Spark job to do this. We’ll then pass this job to a Spark executor running locally and it will spit out CSV files.
We start by creating a Scala object with a main method that will contain our processing code. Inside that main method we’ll instantiate a Spark context:
import org.apache.spark.{SparkConf, SparkContext}
object GenerateCSVFiles {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Chicago Crime Dataset")
val sc = new SparkContext(conf)
}
}
Easy enough. Next we’ll read in the CSV file. I found the easiest way to reference this was with an environment variable but perhaps there’s a more idiomatic way:
import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
object GenerateCSVFiles {
def main(args: Array[String]) {
var crimeFile = System.getenv("CSV_FILE")
if(crimeFile == null || !new File(crimeFile).exists()) {
throw new RuntimeException("Cannot find CSV file [" + crimeFile + "]")
}
println("Using %s".format(crimeFile))
val conf = new SparkConf().setAppName("Chicago Crime Dataset")
val sc = new SparkContext(conf)
val crimeData = sc.textFile(crimeFile).cache()
}
The type of crimeData is RDD - Spark’s way of representing the (lazily evaluated) lines of the CSV file. This also includes the header of the file so let’s write a function to get rid of that since we’ll be generating our own headers for the different files:
import org.apache.spark.rdd.RDD
// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
Now we’re ready to start generating our new CSV files so we’ll write a function which parses each line and extracts the appropriate columns. I’m using Open CSV for this:
import au.com.bytecode.opencsv.CSVParser
def generateFile(file: String, withoutHeader: RDD[String], fn: Array[String] => Array[String], header: String , distinct:Boolean = true, separator: String = ",") = {
FileUtil.fullyDelete(new File(file))
val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file
val rows: RDD[String] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
fn(columns).mkString(separator)
})
})
if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile)
}
We then call this function like this:
generateFile("/tmp/crimes.csv", withoutHeader, columns => Array(columns(0),"Crime", columns(2), columns(6)), "id:ID(Crime),:LABEL,date,description", false)
The output into 'tmpFile' is actually 32 'part files' but I wanted to be able to merge those together into individual CSV files that were easier to work with.
I won’t paste the the full job here but if you want to take a look it’s on github.
Now we need to submit the job to Spark. I’ve wrapped this in a script if you want to follow along but these are the contents:
./spark-1.1.0-bin-hadoop1/bin/spark-submit \
--driver-memory 5g \
--class GenerateCSVFiles \
--master local[8] \
target/scala-2.10/playground_2.10-1.0.jar \
$@
If we execute that we’ll see the following output…"
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Crimes_-_2001_to_present.csv
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/15 00:31:44 INFO SparkContext: Running Spark version 1.3.0
...
15/04/15 00:47:26 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
15/04/15 00:47:26 INFO DAGScheduler: Stage 8 (saveAsTextFile at GenerateCSVFiles.scala:51) finished in 2.702 s
15/04/15 00:47:26 INFO DAGScheduler: Job 4 finished: saveAsTextFile at GenerateCSVFiles.scala:51, took 8.715588 s
real 0m44.935s
user 4m2.259s
sys 0m14.159s
and these CSV files will be generated:
$ ls -alh /tmp/*.csv
-rwxrwxrwx 1 markneedham wheel 3.0K 14 Apr 07:37 /tmp/beats.csv
-rwxrwxrwx 1 markneedham wheel 217M 14 Apr 07:37 /tmp/crimes.csv
-rwxrwxrwx 1 markneedham wheel 84M 14 Apr 07:37 /tmp/crimesBeats.csv
-rwxrwxrwx 1 markneedham wheel 120M 14 Apr 07:37 /tmp/crimesPrimaryTypes.csv
-rwxrwxrwx 1 markneedham wheel 912B 14 Apr 07:37 /tmp/primaryTypes.csv
Let’s have a quick check what they contain:
$ head -n 10 /tmp/beats.csv
id:ID(Beat),:LABEL
1135,Beat
1421,Beat
2312,Beat
1113,Beat
1014,Beat
2411,Beat
1333,Beat
2521,Beat
1652,Beat
$ head -n 10 /tmp/crimes.csv
id:ID(Crime),:LABEL,date,description
9464711,Crime,01/14/2014 05:00:00 AM,SIMPLE
9460704,Crime,01/14/2014 04:55:00 AM,ARMED: HANDGUN
9460339,Crime,01/14/2014 04:44:00 AM,TO PROPERTY
9461467,Crime,01/14/2014 04:43:00 AM,$500 AND UNDER
9460355,Crime,01/14/2014 04:21:00 AM,$500 AND UNDER
9461140,Crime,01/14/2014 03:17:00 AM,FORCIBLE ENTRY
9460361,Crime,01/14/2014 03:12:00 AM,$500 AND UNDER
9461691,Crime,01/14/2014 03:00:00 AM,HOME INVASION
9461792,Crime,01/14/2014 03:00:00 AM,OVER $500
$ head -n 10 /tmp/crimesBeats.csv
:START_ID(Crime),:END_ID(Beat),:TYPE
5896915,0733,ON_BEAT
9208776,2232,ON_BEAT
8237555,0111,ON_BEAT
6464775,0322,ON_BEAT
6468868,0411,ON_BEAT
4189649,0524,ON_BEAT
7620897,0421,ON_BEAT
7720402,0321,ON_BEAT
5053025,1115,ON_BEAT
Looking good. Let’s get them imported into Neo4j:
$ ./neo4j-community-2.2.0/bin/neo4j-import --into /tmp/my-neo --nodes /tmp/crimes.csv --nodes /tmp/beats.csv --nodes /tmp/primaryTypes.csv --relationships /tmp/crimesBeats.csv --relationships /tmp/crimesPrimaryTypes.csv
Nodes
[*>:45.76 MB/s----------------------------------|PROPERTIES(2)=============|NODE:3|v:118.05 MB/] 4M
Done in 5s 605ms
Prepare node index
[*RESOLVE:64.85 MB-----------------------------------------------------------------------------] 4M
Done in 4s 930ms
Calculate dense nodes
[>:42.33 MB/s-------------------|*PREPARE(7)===================================|CALCULATOR-----] 8M
Done in 5s 417ms
Relationships
[>:42.33 MB/s-------------|*PREPARE(7)==========================|RELATIONSHIP------------|v:44.] 8M
Done in 6s 62ms
Node --> Relationship
[*>:??-----------------------------------------------------------------------------------------] 4M
Done in 324ms
Relationship --> Relationship
[*LINK-----------------------------------------------------------------------------------------] 8M
Done in 1s 984ms
Node counts
[*>:??-----------------------------------------------------------------------------------------] 4M
Done in 360ms
Relationship counts
[*>:??-----------------------------------------------------------------------------------------] 8M
Done in 653ms
IMPORT DONE in 26s 517ms
Next I updated conf/neo4j-server.properties to point to my new database:
#***************************************************************
# Server configuration
#***************************************************************
# location of the database directory
#org.neo4j.server.database.location=data/graph.db
org.neo4j.server.database.location=/tmp/my-neo
Now I can start up Neo and start exploring the data:
$ ./neo4j-community-2.2.0/bin/neo4j start
MATCH (:Crime)-[r:CRIME_TYPE]->()
RETURN r
LIMIT 10
There’s lots more relationships and entities that we could pull out of this data set - what I’ve done is just a start. So if you’re up for some more Chicago crime exploration the code and instructions explaining how to run it are on github.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.