1. Create empty feature groups in the Online Feature Store

Create empty feature groups

In this demo example we are expecting to recieve data from Kafka topic, read using spark streaming, do streamig aggregations and ingest aggregated data to feature groups. Thus we will create empy feature groups where we will ingest streaming data.

overview-1.png

import json
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
60application_1623853832952_0042pysparkidleLinkLink
SparkSession available as 'spark'.

Define schema for feature groups

card_schema = StructType([StructField('tid', StringType(), True),
                          StructField('datetime', StringType(), True),
                          StructField('cc_num', LongType(), True),
                          StructField('amount', DoubleType(), True)])

schema_10m = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_10m', LongType(), True),
                         StructField('avg_amt_per_10m', DoubleType(), True),
                         StructField('stdev_amt_per_10m', DoubleType(), True)])

schema_1h = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_1h', LongType(), True),
                         StructField('avg_amt_per_1h', DoubleType(), True),
                         StructField('stdev_amt_per_1h', DoubleType(), True)])

schema_12h = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_12h', LongType(), True),
                         StructField('avg_amt_per_12h', DoubleType(), True),
                         StructField('stdev_amt_per_12h', DoubleType(), True)])

Create empty spark dataframes

empty_card_df = sqlContext.createDataFrame(sc.emptyRDD(), card_schema)
empty_10m_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_10m)
empty_1h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_1h)
empty_12h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_12h)

Establish a connection with your Hopsworks feature store.

import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.

Create feature group metadata objects and save empty spark dataframes to materialise them in hopsworks feature store.

Now We will create each feature group and enable online feature store. Since we are plannig to use these feature groups durring online model serving primary key(s) are required to retrieve feature vector from online feature store.

card_transactions = fs.create_feature_group("card_transactions", 
                                             version = 1,
                                             online_enabled=False, 
                                             statistics_config=False, 
                                             primary_key=["tid"])

card_transactions.save(empty_card_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0b0022c0d0>
card_transactions_10m_agg = fs.create_feature_group("card_transactions_10m_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_10m_agg.save(empty_10m_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0b00251dd0>
card_transactions_1h_agg = fs.create_feature_group("card_transactions_1h_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_1h_agg.save(empty_1h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0affdc8e50>
card_transactions_12h_agg = fs.create_feature_group("card_transactions_12h_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_12h_agg.save(empty_12h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0affdc2f10>