Online Feature Serving

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
5application_1634566917647_0007pysparkidleLinkLink
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

An inference vector is only available for training datasets generated by online enabled feature groups each with at least 1 primary key. In the notebook training_datasets.ipynb, we have already created online enabled feature group sales_fg with version 3.

sales_fg_meta = fs.get_feature_group(name="sales_fg", version=3)

store_fg and exogenous_fg are not yet online enabled. Lets create new, online enabled version of these feature groups

store_fg = fs.get_feature_group(name="store_fg", version=1).read()
store_fg_meta = fs.create_feature_group(name="store_fg",
                                       version=2,
                                       primary_key=['store'],
                                       online_enabled=True,
                                       description="Store related features",
                                       time_travel_format=None,
                                       statistics_config={"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": True})
store_fg_meta.save(store_fg)
exogenous_fg = fs.get_feature_group(name="exogenous_fg", version=1).read()
exogenous_fg_meta = fs.create_feature_group(name="exogenous_fg",
                                            version=2,
                                            primary_key=['store', 'date'],
                                            online_enabled=True,
                                            description="External features that influence sales, but are not under the control of the distribution chain",
                                            time_travel_format=None)
exogenous_fg_meta.save(exogenous_fg)

In addition to containing only online enabled feature groups each with at least 1 primary key, training datasets must me generated from hsfs query object to be able to build inference vector during model serving.

sales_fg_meta = fs.get_feature_group(name="sales_fg", version=3)
store_fg_meta = fs.get_feature_group(name="store_fg", version=2)
exogenous_fg_meta = fs.get_feature_group(name="exogenous_fg", version=2)


query = sales_fg_meta.select(["weekly_sales", "sales_last_month_store", "sales_last_quarter_store", 
                         "sales_last_year_store_dep", "sales_last_month_store_dep", "sales_last_quarter_store_dep", 
                         "sales_last_six_month_store_dep", "sales_last_six_month_store", "sales_last_year_store"])\
                .join(store_fg_meta.select(["num_depts", "size"]))\
                .join(exogenous_fg_meta.select(['fuel_price']))

td_meta = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="tfrecord",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},                                
                               version=8)

td_meta.save(query)

hsfs TrainingDataset object provides utility method get_serving_vector to build serving vector from online feature store. This method method expects dict object where keys are feature group primary key names.

To identify which primary key names are used for this training dataset query use serving_keys method

td_meta = fs.get_training_dataset("sales_model", 8)
#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself
td_meta.init_prepared_statement() 
td_meta.serving_keys
{'store', 'date', 'dept'}

For demo purposes lets prepare list of primary key values that we are interested in to buils feature vectore from online feature store

incoming_data = [(31,"2010-02-05",47),
                 (2,"2010-02-12",92),
                 (20,"2010-03-05",11),
                 (4,"2010-04-02",52),
                 (12,"2010-05-07",27)
                ]

Get feature vector of primary keys in incoming_data

Iterate over incoming_data and use td_meta.get_serving_vector to retrieve serving vector for each primary key combination

for i in incoming_data:
    serving_vector = td_meta.get_serving_vector({'store': i[0],'date': i[1], 'dept': i[2]})
    print (serving_vector)

Lets do the same but retrive all this values as batch.

Since we have already intialised prepared statement for single vector in this session we need to re-initialise prepared statements with size of batch we want to retrieve.

td_meta.init_prepared_statement(batch=True) 
td_meta.serving_keys
{'store', 'date', 'dept'}
serving_vectors = td_meta.get_serving_vectors({'store': [32,2,20,4,12],
                                               'date':  ["2010-02-05", "2010-02-12", "2010-03-05", "2010-04-02", "2010-05-07"], 
                                               'dept': [47, 92, 11, 52, 27]})
serving_vectors