· spark-2

Spark: Convert RDD to DataFrame


import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

private def createFile(df: DataFrame, file: String, header: String): Unit = {
  FileUtil.fullyDelete(new File(file))
  val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
  df.distinct.save(tmpFile, "com.databricks.spark.csv")
}

val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")

rows.collect()
res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])

rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })

<console>:27: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
              sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
                         ^


case class CrimeType(primaryType: String)

sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]

createFile(
  sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")

import sqlContext.implicits._
createFile(
  rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),
  "/tmp/crimeTypes.csv",
  "crimeType:ID(CrimeType)")
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket