Spark: Write to CSV file
A couple of weeks ago I wrote how I’d been using Spark to explore a City of Chicago Crime data set and having worked out how many of each crime had been committed I wanted to write that to a CSV file.
Spark provides a saveAsTextFile function which allows us to save RDD’s so I refactored my code into the following format to allow me to use that:
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
// https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
val parser = new CSVParser(',')
lines.map(line => {
val columns = parser.parseLine(line)
(columns(5), 1)
})
})
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
If we run that code from the Spark shell we end up with a folder called /tmp/primaryTypes.csv containing multiple part files:
$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 .
drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 ..
-rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc
-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000
-rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001
-rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002
-rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003
...
If we look at some of those part files we can see that it’s written the crime types and counts as expected:
$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310
This is fine if we’re going to pass those CSV files into another Hadoop based job but I actually want a single CSV file so it’s not quite what I want.
One way to achieve this is to force everything to be calculated on one partition which will mean we only get one part file generated:
val counts = partitions.repartition(1).
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
part-00000 now looks like this:
$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2
This works but it’s quite a bit slower than when we were doing the aggregation across partitions so it’s not ideal.
Instead, what we can do is make use of one of 3C5271F0B6.7000701@icsi.berkeley.edu%3E">Hadoop’s merge functions which squashes part files together into a single file.
First we import Hadoop into our SBT file:
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"
Now let’s bring our merge function into the Spark shell:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
And now let’s make use of it:
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
counts.saveAsTextFile(file)
merge(file, destinationFile)
And now we’ve got the best of both worlds:
$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2
The full code is available as a gist if you want to play around with it.
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.