Create empty feature groups for Online Feature Store

import json
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
1application_1619309085643_0002pysparkidleLinkLink
SparkSession available as 'spark'.

Create empty feature groups

In this demo example we are expecting to recieve data from Kafka topic, read using spark streaming, do streamig aggregations and ingest aggregated data to feature groups. Thus we will create empy feature groups where we will ingest streaming data.

Define schema for feature groups

card_schema = StructType([StructField('tid', StringType(), True),
                          StructField('datetime', StringType(), True),
                          StructField('cc_num', LongType(), True),
                          StructField('amount', DoubleType(), True)])

schema_10m = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_10m', LongType(), True),
                         StructField('avg_amt_per_10m', DoubleType(), True),
                         StructField('stdev_amt_per_10m', DoubleType(), True)])

schema_1h = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_1h', LongType(), True),
                         StructField('avg_amt_per_1h', DoubleType(), True),
                         StructField('stdev_amt_per_1h', DoubleType(), True)])

schema_12h = StructType([StructField('cc_num', LongType(), True),
                         StructField('num_trans_per_12h', LongType(), True),
                         StructField('avg_amt_per_12h', DoubleType(), True),
                         StructField('stdev_amt_per_12h', DoubleType(), True)])

Create empty spark dataframes

empty_card_df = sqlContext.createDataFrame(sc.emptyRDD(), card_schema)
empty_10m_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_10m)
empty_1h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_1h)
empty_12h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_12h)

Establish a connection with your Hopsworks feature store.

import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.

Create feature group metadata objects and save empty spark dataframes to materialise them in hopsworks feature store.

Now We will create each feature group and enable online feature store. Since we are plannig to use these feature groups durring online model serving primary key(s) are required to retrieve feature vector from online feature store.

card_transactions = fs.create_feature_group("card_transactions", 
                                             version = 1,
                                             online_enabled=False, 
                                             statistics_config=False, 
                                             primary_key=["tid"])

card_transactions.save(empty_card_df)
<hsfs.feature_group.FeatureGroup object at 0x7f3cd6d39510>
card_transactions_10m_agg = fs.create_feature_group("card_transactions_10m_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_10m_agg.save(empty_10m_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f3cd68cd590>
card_transactions_1h_agg = fs.create_feature_group("card_transactions_1h_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_1h_agg.save(empty_1h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f3cd68e1bd0>
card_transactions_12h_agg = fs.create_feature_group("card_transactions_12h_agg", 
                                              version = 1,
                                              online_enabled=True, 
                                              statistics_config=False, 
                                              primary_key=["cc_num"])

card_transactions_12h_agg.save(empty_12h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f3cd6d18990>