Hive PySpark Example

PySpark With Hive

In this notebook we’ll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service “Hive” in your project

Create a SparkSession with Hive Enabled

sparkmagic automatically creates a spark session in the cluster for us with Hive enabled

spark
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
0application_1540813611542_0002pysparkidleLinkLink
SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f183f464860>

Select Hive Database

Using the spark session you can interact with Hive through the sql method on the sparkSession, or through auxillary methods likes .select() and .where().

Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you.

from hops import hdfs as hopsfs
PROJECT_NAME = hopsfs.project_name()
PROJECT_NAME
'test'
spark.sql("use " + PROJECT_NAME)
DataFrame[]

Create Tables

Tables can be created either by issuing a CREATE TABLE statement or by using the saveAsTable() method on an existing dataframe. When using saveAsTable spark will infer the schema from the dataframe and do the CREATE TABLE for you.

spark.sql("show tables").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
spark.sql("CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC")
DataFrame[]
spark.sql("show tables").show()
+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
|    test|magic_matrix|      false|
+--------+------------+-----------+
from pyspark.sql.types import *
schema = StructType([StructField('SquaredValue', IntegerType(), True)])
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)
rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])
dfValues = sqlContext.createDataFrame(rddValues,schema)
dfValues.show(5)
+------------+
|SquaredValue|
+------------+
|           0|
|           1|
|           4|
|           9|
|          16|
+------------+
only showing top 5 rows
dfValues.write.format("ORC").mode("overwrite").saveAsTable("SquaredValues")
spark.sql("show tables").show()
+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|    test| magic_matrix|      false|
|    test|squaredvalues|      false|
+--------+-------------+-----------+

Insert Values

Values can be inserted with plain SQL or by using saveAsTable / insertInto

Simple Insert

spark.sql("INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)")
DataFrame[]
spark.sql("SELECT * FROM magic_matrix").show()
+--------+-----+
|position|value|
+--------+-----+
|       1| 99.0|
|       2|100.0|
+--------+-----+

Insert with saveAsTable

rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])
dfValues2 = sqlContext.createDataFrame(rddValues2,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     100|
+--------+
dfValues2.write.format("ORC").mode("append").saveAsTable("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     200|
+--------+

Insert with insertInto

dfValues2.write.mode("append").insertInto("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     300|
+--------+

You can also use overwrite mode:

dfValues2.write.format("ORC").mode("overwrite").saveAsTable("squaredvalues")
spark.sql("REFRESH TABLE squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     100|
+--------+

Insert by using TempTable

rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])
dfValues3 = sqlContext.createDataFrame(rddValues3,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     200|
+--------+
dfValues3.registerTempTable("temptable")
sqlContext.sql("insert into table squaredvalues select * from temptable")
DataFrame[]
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
|     300|
+--------+

Queries

spark.sql("SELECT * FROM squaredvalues WHERE squaredvalue > 380 ").show()
+------------+
|SquaredValue|
+------------+
|       40000|
|       40401|
|       40804|
|       41209|
|       41616|
|       42025|
|       42436|
|       42849|
|       43264|
|       43681|
|       44100|
|       44521|
|       44944|
|       45369|
|       45796|
|       46225|
|       46656|
|       47089|
|       47524|
|       47961|
+------------+
only showing top 20 rows
spark.sql("SELECT * FROM magic_matrix WHERE position = 2 ").show()
+--------+-----+
|position|value|
+--------+-----+
|       2|100.0|
+--------+-----+

Drop Tables

spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+
|      database|    tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive| magic_matrix|      false|
|sparksqlonhive|squaredvalues|      false|
|              |    temptable|       true|
+--------------+-------------+-----------+
spark.sql("DROP TABLE magic_matrix")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+
|      database|    tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive|squaredvalues|      false|
|              |    temptable|       true|
+--------------+-------------+-----------+
spark.sql("DROP TABLE squaredvalues")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |temptable|       true|
+--------+---------+-----------+
spark.catalog.dropTempView("temptable")
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+