Spark: Convert RDD to DataFrame
As I mentioned in a previous blog post I’ve been playing around with the Databricks Spark CSV library and wanted to take a CSV file, clean it up and then write out a new CSV file containing some of the columns.
I started by processing the CSV file and writing it into a temporary table:
import org.apache.spark.sql.{SQLContext, Row, DataFrame}
val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
I wanted to get to the point where I could call the following function which writes a DataFrame to disk:
private def createFile(df: DataFrame, file: String, header: String): Unit = {
FileUtil.fullyDelete(new File(file))
val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
df.distinct.save(tmpFile, "com.databricks.spark.csv")
}
The first file only needs to contain the primary type of crime, which we can extract with the following query:
val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")
rows.collect()
res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])
Some of the primary types have trailing spaces which I want to get rid of. As far as I can tell Spark’s variant of SQL doesn’t have the LTRIM or RTRIM functions but we can map over 'rows' and use the String 'trim' function instead:
rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776
Now we’ve got an RDD of Rows which we need to convert back to a DataFrame again. 'sqlContext' has a function which we might be able to use:
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
<console>:27: error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
^
These are the signatures we can choose from:
If we want to pass in an RDD of type Row we’re going to have to define a StructType or we can convert each row into something more strongly typed:
case class CrimeType(primaryType: String)
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]
Great, we’ve got our DataFrame which we can now plug into the 'createFile' function like so:
createFile(
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),
"/tmp/crimeTypes.csv",
"crimeType:ID(CrimeType)")
We can actually do better though!
Since we’ve got an RDD of a specific class we can make use of the 'rddToDataFrameHolder' implicit function and then the 'toDF' function on 'DataFrameHolder'. This is what the code looks like:
import sqlContext.implicits._
createFile(
rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),
"/tmp/crimeTypes.csv",
"crimeType:ID(CrimeType)")
And we’re done!
About the author
I'm currently working on short form content at ClickHouse. I publish short 5 minute videos showing how to solve data problems on YouTube @LearnDataWithMark. I previously worked on graph analytics at Neo4j, where I also co-authored the O'Reilly Graph Algorithms Book with Amy Hodler.