Hive PySpark Example
PySpark With Hive
In this notebook we’ll cover how you can read/write to Hive using SparkSQL, this notebook assumes that you have enabled the service “Hive” in your project
Create a SparkSession with Hive Enabled
sparkmagic automatically creates a spark session in the cluster for us with Hive enabled
spark
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|---|---|---|---|---|---|
0 | application_1540813611542_0002 | pyspark | idle | Link | Link | ✔ |
SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f183f464860>
Select Hive Database
Using the spark session you can interact with Hive through the sql
method on the sparkSession, or through auxillary methods likes .select()
and .where()
.
Each project that have enabled Hive will automatically have a Hive database created for them, this is the only Hive database that you can access unless someone have shared their database with you.
from hops import hdfs as hopsfs
PROJECT_NAME = hopsfs.project_name()
PROJECT_NAME
'test'
spark.sql("use " + PROJECT_NAME)
DataFrame[]
Create Tables
Tables can be created either by issuing a CREATE TABLE
statement or by using the saveAsTable()
method on an existing dataframe. When using saveAsTable
spark will infer the schema from the dataframe and do the CREATE TABLE
for you.
spark.sql("show tables").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
spark.sql("CREATE TABLE MAGIC_MATRIX (position int, value float) STORED AS ORC")
DataFrame[]
spark.sql("show tables").show()
+--------+------------+-----------+
|database| tableName|isTemporary|
+--------+------------+-----------+
| test|magic_matrix| false|
+--------+------------+-----------+
from pyspark.sql.types import *
schema = StructType([StructField('SquaredValue', IntegerType(), True)])
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)
rddValues = spark.sparkContext.parallelize(list(range(0,100))).map(lambda x: [x*x])
dfValues = sqlContext.createDataFrame(rddValues,schema)
dfValues.show(5)
+------------+
|SquaredValue|
+------------+
| 0|
| 1|
| 4|
| 9|
| 16|
+------------+
only showing top 5 rows
dfValues.write.format("ORC").mode("overwrite").saveAsTable("SquaredValues")
spark.sql("show tables").show()
+--------+-------------+-----------+
|database| tableName|isTemporary|
+--------+-------------+-----------+
| test| magic_matrix| false|
| test|squaredvalues| false|
+--------+-------------+-----------+
Insert Values
Values can be inserted with plain SQL or by using saveAsTable
/ insertInto
Simple Insert
spark.sql("INSERT INTO TABLE magic_matrix VALUES (1, 99), (2, 100)")
DataFrame[]
spark.sql("SELECT * FROM magic_matrix").show()
+--------+-----+
|position|value|
+--------+-----+
| 1| 99.0|
| 2|100.0|
+--------+-----+
Insert with saveAsTable
rddValues2 = spark.sparkContext.parallelize(list(range(100,200))).map(lambda x: [x*x])
dfValues2 = sqlContext.createDataFrame(rddValues2,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 100|
+--------+
dfValues2.write.format("ORC").mode("append").saveAsTable("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 200|
+--------+
Insert with insertInto
dfValues2.write.mode("append").insertInto("squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 300|
+--------+
You can also use overwrite mode:
dfValues2.write.format("ORC").mode("overwrite").saveAsTable("squaredvalues")
spark.sql("REFRESH TABLE squaredvalues")
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 100|
+--------+
Insert by using TempTable
rddValues3 = spark.sparkContext.parallelize(list(range(200,300))).map(lambda x: [x*x])
dfValues3 = sqlContext.createDataFrame(rddValues3,schema)
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 200|
+--------+
dfValues3.registerTempTable("temptable")
sqlContext.sql("insert into table squaredvalues select * from temptable")
DataFrame[]
spark.sql("SELECT COUNT(*) FROM squaredvalues").show()
+--------+
|count(1)|
+--------+
| 300|
+--------+
Queries
spark.sql("SELECT * FROM squaredvalues WHERE squaredvalue > 380 ").show()
+------------+
|SquaredValue|
+------------+
| 40000|
| 40401|
| 40804|
| 41209|
| 41616|
| 42025|
| 42436|
| 42849|
| 43264|
| 43681|
| 44100|
| 44521|
| 44944|
| 45369|
| 45796|
| 46225|
| 46656|
| 47089|
| 47524|
| 47961|
+------------+
only showing top 20 rows
spark.sql("SELECT * FROM magic_matrix WHERE position = 2 ").show()
+--------+-----+
|position|value|
+--------+-----+
| 2|100.0|
+--------+-----+
Drop Tables
spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+
| database| tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive| magic_matrix| false|
|sparksqlonhive|squaredvalues| false|
| | temptable| true|
+--------------+-------------+-----------+
spark.sql("DROP TABLE magic_matrix")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------------+-------------+-----------+
| database| tableName|isTemporary|
+--------------+-------------+-----------+
|sparksqlonhive|squaredvalues| false|
| | temptable| true|
+--------------+-------------+-----------+
spark.sql("DROP TABLE squaredvalues")
DataFrame[]
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| |temptable| true|
+--------+---------+-----------+
spark.catalog.dropTempView("temptable")
spark.sql("SHOW TABLES").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+