3. Windowed aggregations using Spark streaming and ingestion to the Online Feature Store

Feature engineering and ingestion to the Feature Store in a streaming fashion

overview-3.png

Import necessary libraries

import json

from pyspark.sql.functions import from_json, window, avg,count, stddev, explode, date_format,col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType

from hops import kafka, tls, hdfs
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
61application_1623853832952_0043pysparkidleLinkLink
SparkSession available as 'spark'.
# Name of the kafka topic to read card transactions from
KAFKA_TOPIC_NAME = "credit_card_transactions"

Create a stream from the kafka topic

df_read = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka.get_broker_endpoints()) \
  .option("kafka.security.protocol",kafka.get_security_protocol()) \
  .option("kafka.ssl.truststore.location", tls.get_trust_store()) \
  .option("kafka.ssl.truststore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.keystore.location", tls.get_key_store()) \
  .option("kafka.ssl.keystore.password", tls.get_key_store_pwd()) \
  .option("kafka.ssl.key.password", tls.get_trust_store_pwd()) \
  .option("kafka.ssl.endpoint.identification.algorithm", "") \
  .option("startingOffsets", "earliest")\
  .option("subscribe", KAFKA_TOPIC_NAME) \
  .load()
# Define schema to read from kafka topic 
parse_schema = StructType([StructField('tid', StringType(), True),
                           StructField('datetime', StringType(), True),
                           StructField('cc_num', StringType(), True),
                           StructField('amount', StringType(), True)])
# Deserialise data from and create streaming query
df_deser = df_read.selectExpr("CAST(value AS STRING)")\
                   .select(from_json("value", parse_schema).alias("value"))\
                   .select("value.tid", "value.datetime", "value.cc_num", "value.amount")\
                   .selectExpr("CAST(tid as string)", "CAST(datetime as string)", "CAST(cc_num as long)", "CAST(amount as double)")
df_deser.isStreaming
True
df_deser.printSchema()
root
 |-- tid: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- amount: double (nullable = true)

Create windowing aggregations over different time windows using spark streaming.

# 10 minute window
windowed10mSignalDF =df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "10 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_10m"), stddev("amount").alias("stdev_amt_per_10m"), count("cc_num").alias("num_trans_per_10m"))\
    .na.fill(0, ["stdev_amt_per_10m"])\
    .select("cc_num", "num_trans_per_10m", "avg_amt_per_10m", "stdev_amt_per_10m")
windowed10mSignalDF.isStreaming
True
windowed10mSignalDF.printSchema()
root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_10m: long (nullable = false)
 |-- avg_amt_per_10m: double (nullable = true)
 |-- stdev_amt_per_10m: double (nullable = true)
# 1 hour window
windowed1hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "60 minutes"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_1h"), stddev("amount").alias("stdev_amt_per_1h"), count("cc_num").alias("num_trans_per_1h"))\
    .na.fill(0, ["stdev_amt_per_1h"])\
    .select("cc_num", "num_trans_per_1h", "avg_amt_per_1h", "stdev_amt_per_1h")
windowed1hSignalDF.isStreaming
True
windowed1hSignalDF.printSchema()
root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_1h: long (nullable = false)
 |-- avg_amt_per_1h: double (nullable = true)
 |-- stdev_amt_per_1h: double (nullable = true)
# 12 hour window
windowed12hSignalDF = \
  df_deser \
    .selectExpr("CAST(tid as string)", "CAST(datetime as timestamp)", "CAST(cc_num as long)", "CAST(amount as double)")\
    .withWatermark("datetime", "60 minutes") \
    .groupBy(window("datetime", "12 hours"), "cc_num") \
    .agg(avg("amount").alias("avg_amt_per_12h"), stddev("amount").alias("stdev_amt_per_12h"), count("cc_num").alias("num_trans_per_12h"))\
    .na.fill(0, ["stdev_amt_per_12h"])\
    .select("cc_num", "num_trans_per_12h", "avg_amt_per_12h", "stdev_amt_per_12h")
windowed12hSignalDF.isStreaming
True
windowed12hSignalDF.printSchema()
root
 |-- cc_num: long (nullable = true)
 |-- num_trans_per_12h: long (nullable = false)
 |-- avg_amt_per_12h: double (nullable = true)
 |-- stdev_amt_per_12h: double (nullable = true)

Establish a connection with your Hopsworks feature store.

import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.

Get feature groups from hopsworks feature store.

card_transactions = fs.get_feature_group("card_transactions", version = 1)
card_transactions_10m_agg = fs.get_feature_group("card_transactions_10m_agg", version = 1)
card_transactions_1h_agg = fs.get_feature_group("card_transactions_1h_agg", version = 1)
card_transactions_12h_agg = fs.get_feature_group("card_transactions_12h_agg", version = 1)

Insert streaming dataframes to the online feature group

Now we are ready to write this streaming dataframe as a long living application to the online storage of the other feature group.

query_10m = card_transactions_10m_agg.insert_stream(windowed10mSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_10m_agg`, with version `1` will not compute statistics.
query_1h = card_transactions_1h_agg.insert_stream(windowed1hSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_1h_agg`, with version `1` will not compute statistics.
query_12h = card_transactions_12h_agg.insert_stream(windowed12hSignalDF)
StatisticsWarning: Stream ingestion for feature group `card_transactions_12h_agg`, with version `1` will not compute statistics.

Check if spark streaming query is active

print("IsActive:\n query_10m: {}\n query_1h: {}\n query_12h: {}".format(query_10m.isActive, query_1h.isActive, query_12h.isActive))
IsActive:
 query_10m: True
 query_1h: True
 query_12h: True

We can also check status of a query and if there are any exceptions trown.

print("Status:\n query_10m: {}\n query_1h: {}\n query_12h: {}".format(query_10m.status, query_1h.status, query_12h.status))
Status:
 query_10m: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
 query_1h: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
 query_12h: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
query_10m.exception()

Lets check if data was ingested in to the online feature store

fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).show(20,False)
+----------------+-----------------+------------------+------------------+
|cc_num          |num_trans_per_12h|avg_amt_per_12h   |stdev_amt_per_12h |
+----------------+-----------------+------------------+------------------+
|4444037300542691|7                |154.51857142857145|238.93552871214524|
|4609072304828342|7                |176.02714285714288|263.06316920176454|
|4161715127983823|10               |928.3030000000001 |1809.7934375689888|
|4223253728365626|13               |1201.686153846154 |2724.0564739389993|
|4572259224622748|9                |1291.5500000000002|2495.189283160699 |
|4436298663019939|11               |149.78636363636366|235.75729924109365|
|4231082502226286|10               |977.8430000000001 |2071.1095165208753|
|4159210768503456|6                |37.303333333333335|26.403001092047596|
|4090612125343330|15               |646.7259999999999 |1336.9214811370616|
|4416410688550228|11               |663.0627272727273 |1631.6188600717442|
|4811343280984688|6                |237.08166666666662|305.3312340666554 |
|4853206196105715|10               |1077.6439999999998|2793.72050986255  |
|4032763187099525|10               |425.006           |1204.4018071612704|
|4645884898081724|8                |171.48125000000002|200.51632676222223|
|4524584153018280|7                |160.18714285714285|223.19407330200963|
|4815447301191763|6                |78.35833333333333 |29.206839895248287|
|4872287670027309|3                |260.63666666666666|427.3684126527525 |
|4734811798843814|7                |122.81714285714285|166.56810466135522|
|4609746692923340|9                |1401.2866666666669|3007.164227935514 |
|4526611032294580|6                |109.52166666666666|150.55966530471125|
+----------------+-----------------+------------------+------------------+
only showing top 20 rows
fs.sql("SELECT * FROM card_transactions_12h_agg_1",online=True).count()
100

Insert data in to offline feature group.

Hopsworks online feature store will store latest avaible value of feature for low latency model serving. However, we also want to store data in to the offline feature store to store historical data.

def foreach_batch_function_card(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions.statistics_config = {"enabled": False, "histograms": False, "correlations": False}
    card_transactions.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_card = df_deser.writeStream.foreachBatch(foreach_batch_function_card)\
                    .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-card")\
                    .start()    
def foreach_batch_function_10m(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_10m_agg.statistics_config = {"enabled": False, "histograms": False, "correlations": False}
    card_transactions_10m_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_10m = windowed10mSignalDF.writeStream.foreachBatch(foreach_batch_function_10m)\
                              .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-data10m")\
                              .start()    
def foreach_batch_function_1h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_1h_agg.statistics_config = {"enabled": False, "histograms": False, "correlations": False}
    card_transactions_1h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_1h = windowed1hSignalDF.writeStream.foreachBatch(foreach_batch_function_1h)\
                            .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-1h")\
                            .start()
def foreach_batch_function_12h(batchDF, epoch_id):
    batchDF.persist()
    print(epoch_id)
    extra_hudi_options = {
    "hoodie.bulkinsert.shuffle.parallelism":"1",     
    "hoodie.insert.shuffle.parallelism":"1", 
    "hoodie.upsert.shuffle.parallelism":"1",
    "hoodie.parquet.compression.ratio":"0.5"
    }
    # Transform and write batchDF
    card_transactions_12h_agg.statistics_config = {"enabled": False, "histograms": False, "correlations": False}
    card_transactions_12h_agg.insert(batchDF,write_options=extra_hudi_options, storage="offline")
    batchDF.unpersist()

hudi_12h = windowed12hSignalDF.writeStream.foreachBatch(foreach_batch_function_12h)\
                              .option("checkpointLocation", hdfs.project_path() + "/Resources/checkpoint-12h")\
                              .start()

Check if queries are still active

print("IsActive:\n hudi_10m: {}\n hudi_1h: {}\n hudi_12h: {}".format(hudi_10m.isActive, hudi_1h.isActive, hudi_12h.isActive))
IsActive:
 hudi_10m: True
 hudi_1h: True
 hudi_12h: True
print("Status:\n hudi_10m: {}\n hudi_1h: {}\n hudi_12h: {}".format(hudi_10m.status, hudi_1h.status, hudi_12h.status))
Status:
 hudi_10m: {'message': 'No new data but cleaning up state', 'isDataAvailable': False, 'isTriggerActive': True}
 hudi_1h: {'message': 'No new data but cleaning up state', 'isDataAvailable': False, 'isTriggerActive': True}
 hudi_12h: {'message': 'No new data but cleaning up state', 'isDataAvailable': False, 'isTriggerActive': True}

Stop queries (optional)

If you are running this from a notebook, you can kill the Spark Structured Streaming Query by stopping the Kernel or by calling its .stop() method.

hudi_card.stop()
hudi_10m.stop()
hudi_1h.stop()
hudi_12h.stop()

NOTE: For this demo, you can leave the ingestion queries running to continuosly processed upcoming transactions. This is necessary if you want to generate fraudulent transactions afterwards.

query_10m.stop()
query_1h.stop()
query_12h.stop()