· spark-2

Spark: Convert RDD to DataFrame

As I mentioned in a previous blog post I’ve been playing around with the Databricks Spark CSV library and wanted to take a CSV file, clean it up and then write out a new CSV file containing some of the columns.

I started by processing the CSV file and writing it into a temporary table:

import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

I wanted to get to the point where I could call the following function which writes a DataFrame to disk:

private def createFile(df: DataFrame, file: String, header: String): Unit = {
  FileUtil.fullyDelete(new File(file))
  val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
  df.distinct.save(tmpFile, "com.databricks.spark.csv")
}

The first file only needs to contain the primary type of crime, which we can extract with the following query:

val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")

rows.collect()
res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])

Some of the primary types have trailing spaces which I want to get rid of. As far as I can tell Spark’s variant of SQL doesn’t have the LTRIM or RTRIM functions but we can map over 'rows' and use the String 'trim' function instead:

rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776

Now we’ve got an RDD of Rows which we need to convert back to a DataFrame again. 'sqlContext' has a function which we might be able to use:

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })

<console>:27: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
              sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
                         ^

These are the signatures we can choose from:

2015 08 06 21 58 12

If we want to pass in an RDD of type Row we’re going to have to define a StructType or we can convert each row into something more strongly typed:

case class CrimeType(primaryType: String)

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]

Great, we’ve got our DataFrame which we can now plug into the 'createFile' function like so:

createFile(
  sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")

We can actually do better though!

Since we’ve got an RDD of a specific class we can make use of the 'rddToDataFrameHolder' implicit function and then the 'toDF' function on 'DataFrameHolder'. This is what the code looks like:

import sqlContext.implicits._
createFile(
  rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")

And we’re done!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket