Do you want to try out this notebook? Get a free account (no credit-card reqd) at hopsworks.ai. You can also install open-source Hopsworks or view tutorial videos here.
3. Windowed aggregations using spark streaming and ingestion to the online feature store
Import necessary libraries
import json
from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType
from hops import kafka, tls, hdfs
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
1 | application_1620686564138_0002 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
# name of the kafka topic to read card transactions from
KAFKA_TOPIC_NAME = "credit_card_transactions"
Create a stream from the kafka topic
df_read = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
.option("kafka.security.protocol",kafka.get_security_protocol()) \
.option("kafka.ssl.truststore.location", tls.get_trust_store()) \
.option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
.option("kafka.ssl.keystore.location", tls.get_key_store()) \
.option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
.option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
.option("kafka.ssl.endpoint.identification.algorithm", "") \
.option("startingOffsets", "earliest")\
.option("subscribe", KAFKA_TOPIC_NAME) \
.load()
# Define schema to read from kafka topic
parse_schema = StructType([StructField('tid', StringType(), True),
StructField('datetime', StringType(), True),
StructField('cc_num', StringType(), True),
StructField('amount', StringType(), True)])
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
.select(from_json("value", parse_schema).alias("value"))\
.select("value.tid", "value.datetime", "value.cc_num", "value.amount")\
.selectExpr("CAST(tid as string)", "CAST(datetime as string)", "CAST(cc_num as long)", "CAST(amount as double)")
df_deser.isStreaming
True
df_deser.printSchema()
root
|-- tid: string (nullable = true)
|-- datetime: string (nullable = true)
|-- cc_num: long (nullable = true)
|-- amount: double (nullable = true)
Create windowing aggregations over different time windows using spark streaming.
# 10 minute window
windowed10mSignalDF =df_deser \
.selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
.withWatermark("datetime", "60 minutes") \
.groupBy(window("datetime", "10 minutes"), "cc_num") \
.agg(avg("amount").alias("avg_amt_per_10m"), stddev("amount").alias("stdev_amt_per_10m"), count("cc_num").alias("num_trans_per_10m"))\
.select("cc_num", "num_trans_per_10m", "avg_amt_per_10m", "stdev_amt_per_10m")
windowed10mSignalDF.isStreaming
True
windowed10mSignalDF.printSchema()
root
|-- cc_num: long (nullable = true)
|-- num_trans_per_10m: long (nullable = false)
|-- avg_amt_per_10m: double (nullable = true)
|-- stdev_amt_per_10m: double (nullable = true)
# 1 hour window
windowed1hSignalDF = \
df_deser \
.selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
.withWatermark("datetime", "60 minutes") \
.groupBy(window("datetime", "60 minutes"), "cc_num") \
.agg(avg("amount").alias("avg_amt_per_1h"), stddev("amount").alias("stdev_amt_per_1h"), count("cc_num").alias("num_trans_per_1h"))\
.select("cc_num", "num_trans_per_1h", "avg_amt_per_1h", "stdev_amt_per_1h")
windowed1hSignalDF.isStreaming
True
windowed1hSignalDF.printSchema()
root
|-- cc_num: long (nullable = true)
|-- num_trans_per_1h: long (nullable = false)
|-- avg_amt_per_1h: double (nullable = true)
|-- stdev_amt_per_1h: double (nullable = true)
# 12 hour window
windowed12hSignalDF = \
df_deser \
.selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
.withWatermark("datetime", "60 minutes") \
.groupBy(window("datetime", "12 hours"), "cc_num") \
.agg(avg("amount").alias("avg_amt_per_12h"), stddev("amount").alias("stdev_amt_per_12h"), count("cc_num").alias("num_trans_per_12h"))\
.select("cc_num", "num_trans_per_12h", "avg_amt_per_12h", "stdev_amt_per_12h")
windowed12hSignalDF.isStreaming
True
windowed12hSignalDF.printSchema()
root
|-- cc_num: long (nullable = true)
|-- num_trans_per_12h: long (nullable = false)
|-- avg_amt_per_12h: double (nullable = true)
|-- stdev_amt_per_12h: double (nullable = true)
Establish a connection with your Hopsworks feature store.
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
Get feature groups from hopsworks feature store.
card_transactions = fs.get_feature_group("card_transactions", version = 1)
card_transactions_10m_agg = fs.get_feature_group("card_transactions_10m_agg", version = 1)
card_transactions_1h_agg = fs.get_feature_group("card_transactions_1h_agg", version = 1)
card_transactions_12h_agg = fs.get_feature_group("card_transactions_12h_agg", version = 1)
Insert streaming dataframes to the online feature group
Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.
query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_10m_agg`, with version `1` will not compute statistics.
query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_1h_agg`, with version `1` will not compute statistics.
query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_12h_agg`, with version `1` will not compute statistics.
Check if spark streaming query is active
query_10m.isActive
True
query_1h.isActive
True
query_12h.isActive
True
We can also check status of a query and if there are any exceptions trown.
query_10m.status
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
query_10m.exception()
Lets check if data was ingested in to the online feature store
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).show(20,False)
+----------------+-----------------+------------------+------------------+
|cc_num |num_trans_per_12h|avg_amt_per_12h |stdev_amt_per_12h |
+----------------+-----------------+------------------+------------------+
|4226063306212844|5 |1134.8 |2370.804468076185 |
|4467512729899486|9 |205.1577777777778 |303.5732691945133 |
|4232153519700594|12 |129.64416666666668|250.8604352463132 |
|4376360021712050|8 |78.41125 |122.69854702102805|
|4867010117638802|8 |1224.9825 |3033.6029145826396|
|4956860373932956|8 |598.58125 |1242.149174184319 |
|4997591057565538|7 |127.6014285714286 |211.0837646866421 |
|4965123463794391|9 |96.02777777777777 |179.76414115291306|
|4671096685272336|5 |146.296 |199.37470458912284|
|4001837582277998|7 |193.3242857142857 |292.91272988480955|
|4135449811055770|6 |1212.2166666666667|1507.44335556155 |
|4136262720215016|11 |811.387272727273 |2476.740706473291 |
|4208317936968510|7 |267.33285714285716|330.5734197176318 |
|4893308344742860|8 |290.56 |333.44267401930256|
|4213741526478791|5 |137.602 |185.38636659689945|
|4260567335033291|9 |329.4222222222222 |306.2573898609541 |
|4683617042712171|8 |32.47 |32.10560031254005 |
|4802174255861762|9 |709.7955555555556 |1933.2421975602997|
|4925013053127624|11 |855.5918181818183 |1638.8087694714 |
|4830349319515887|12 |975.3983333333332 |2249.727648986684 |
+----------------+-----------------+------------------+------------------+
only showing top 20 rows
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).count()
100
Insert data in to offline feature group.
Hopsworks online feature store will store latest avaible value of feature for low latency model serving. However, we also want to store data in to the offline feature store to store historical data.
def foreach_batch_function_card(batchDF, epoch_id):
batchDF.persist()
print(epoch_id)
extra_hudi_options = {
"hoodie.bulkinsert.shuffle.parallelism":"1",
"hoodie.insert.shuffle.parallelism":"1",
"hoodie.upsert.shuffle.parallelism":"1",
"hoodie.parquet.compression.ratio":"0.5"
}
# Transform and write batchDF
card_transactions.insert(batchDF,write_options=extra_hudi_options, storage="offline")
batchDF.unpersist()
hudi_card = df_deser.writeStream.foreachBatch(foreach_batch_function_card)\
.option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-card")\
.start()
def foreach_batch_function_10m(batchDF, epoch_id):
batchDF.persist()
print(epoch_id)
extra_hudi_options = {
"hoodie.bulkinsert.shuffle.parallelism":"1",
"hoodie.insert.shuffle.parallelism":"1",
"hoodie.upsert.shuffle.parallelism":"1",
"hoodie.parquet.compression.ratio":"0.5"
}
# Transform and write batchDF
card_transactions_10m_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
batchDF.unpersist()
hudi_10m = windowed10mSignalDF.writeStream.foreachBatch(foreach_batch_function_10m)\
.option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-data10m")\
.start()
def foreach_batch_function_1h(batchDF, epoch_id):
batchDF.persist()
print(epoch_id)
extra_hudi_options = {
"hoodie.bulkinsert.shuffle.parallelism":"1",
"hoodie.insert.shuffle.parallelism":"1",
"hoodie.upsert.shuffle.parallelism":"1",
"hoodie.parquet.compression.ratio":"0.5"
}
# Transform and write batchDF
card_transactions_1h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
batchDF.unpersist()
hudi_1h = windowed1hSignalDF.writeStream.foreachBatch(foreach_batch_function_1h)\
.option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-1h")\
.start()
def foreach_batch_function_12h(batchDF, epoch_id):
batchDF.persist()
print(epoch_id)
extra_hudi_options = {
"hoodie.bulkinsert.shuffle.parallelism":"1",
"hoodie.insert.shuffle.parallelism":"1",
"hoodie.upsert.shuffle.parallelism":"1",
"hoodie.parquet.compression.ratio":"0.5"
}
# Transform and write batchDF
card_transactions_12h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
batchDF.unpersist()
hudi_12h = windowed12hSignalDF.writeStream.foreachBatch(foreach_batch_function_12h)\
.option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-12h")\
.start()
Check if queries are still active
hudi_card.isActive
True
hudi_10m.isActive
True
hudi_1h.isActive
True
hudi_12h.isActive
True
Stop queries
If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its .stop()
method.
query_10m.stop()
query_1h.stop()
query_12h.stop()
hudi_card.stop()
hudi_10m.stop()
hudi_1h.stop()
hudi_12h.stop()