Spark SQL DataFrames#

SQLContext#

spark-shell
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

employee.json

	{"id" : "1201", "name" : "satish", "age" : "25"},
	{"id" : "1202", "name" : "krishna", "age" : "28"},
	{"id" : "1203", "name" : "amith", "age" : "39"},
	{"id" : "1204", "name" : "javed", "age" : "23"},
	{"id" : "1205", "name" : "prudvi", "age" : "23"}

DataFrame Operations#

scala> val dfs = spark.sqlContext.read.json("employee.json")
scala> dfs.show()
scala> dfs.printSchema()
scala> dfs.select("name").show()
scala> dfs.filter(dfs("age") > 23).show()
scala> dfs.groupBy("age").count().show()

Running SQL Queries Programmatically#

Methods Description
Inferring the Schema using Reflection This method uses reflection to generate the schema of an RDD that contains specific types of objects.
Programmatically Specifying the Schema The second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

Inferring the Schema using Reflection#

Example#

employee.txt

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Start the Spark Shell#

$ spark-shell

Create SQLContext#

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Import SQL Functions#

scala> import sqlContext.implicits._

Create Case Class#

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Create RDD and Apply Transformations#

scala> val empl=sc.textFile("employee.txt").map(_.split(",")).map(e=> Employee(e(0).trim.toInt,e(1), e(2).trim.toInt)).toDF()

Output: empl: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]

Store the DataFrame Data in a Table#

scala> empl.registerTempTable("employee")

Select Query on DataFrame#

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
scala> allrecords.show()

Where Clause SQL Query on DataFrame#

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE age >=20 AND age <= 35")
scala> agefilter.show()

Fetch ID values from agefilter DataFrame using column index#

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Programmatically Specifying the Schema#

Example#

employee.txt

Start the Spark Shell#

$ spark-shell

Create SQLContext#

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Read Input from Text File#

scala> val employee = sc.textFile("employee.txt")

Create an Encoded Schema in a String Format#

scala> val schemaString = "id name age"

Import Respective APIs#

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Generate Schema#

scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Apply Transformation for Reading Data from Text File#

scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim, e(1), e(2).trim))

Apply RowRDD in Row Data based on Schema#

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Store DataFrame Data into Table#

scala> employeeDF.registerTempTable("employee")

Select Query on DataFrame#

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
scala> allrecords.show()

References#