Feature Engineering/Ingestion

Feature Store Tour - Python API

This set of notebooks contain a tour/reference for the Hopsworks feature store Scala/Java API. The notebook is meant to be run from feature store demo projects on Hopsworks. We will go over best practices for using the API as well as common pitfalls.

There are 3 notebooks: - Feature groups: Discover how to work with features and feature groups, both offline and online - Feature Exploration: Discover how to join features from different feature groups - Training datasets: Discover how to save training datasets to be used by ML models

The data required to run this tour is located in a zip file called archive.zip in the same directory as the notebooks. Head to the Dataset browser on Hopsworks and unzip it.

Features and Feature Groups

The Hopsworks feature store is a centralized repository, within an organization, to manage machine learning features. A feature is a measurable property of a phenomenon. It could be a simple value such as the age of a customer, or it could be an aggregated value, such as the number of transactions made by a customer in the last 30 days.

A feature is not restricted to an numeric value, it could be a string representing an address, or an image.

Feature Store Overview

A feature store is not a pure storage service, it goes hand-in-hand with feature computation. Feature engineering is the process of transforming raw data into a format that is compatible and understandable for predictive models.

In this notebook we are going to focus on the left side of the picture above. In particular how data engeneers can create features and push them to the Hopsworks feature store so that they are available to the data scientists

HSFS library

The Hopsworks feature feature store library is called hsfs (Hopsworks Feature Store). The library is Apache V2 licensed and available here. The library is currently available for Python and JVM languages such as Scala and Java. In this notebook we are going to cover Python part.

You can find the complete documentation of the library here:

The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you’ll be working with.

By default connection.get_feature_store() returns the feature store of the project you are working with. However, it accepts also a project name as parameter to select a different feature store.

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Data Engineering

We are going to use a dataset containing information related to a chain of deparment stores. The dataset is taken from Kaggle.

We are going to create 3 feature groups: - stores_fg: it’s going to contain features related to the store itself. Mainly the category, the number of deparmetns and the size. - sales_fg: it’s going to contain sales features for each store/deparment over the weeks. - exogenous_fg: it’s going to contain features which are not related to the stores themselves, but they have an effect on sales. These features are, for instance, the gas price, the unemployment rate, temperature in the area and so on.

from hops import hdfs
from pyspark.sql import functions as F

stores_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/stores data-set.csv".format(hdfs.project_name()))

exogenous_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/Features data set.csv".format(hdfs.project_name()))

sales_csv = spark.read\
             .option("inferSchema", "true")\
             .option("header", "true")\
             .load("hdfs:///Projects/{}/Jupyter/hsfs/archive/sales data-set.csv".format(hdfs.project_name()))

Feature Engineering Stores

stores_depts_count = stores_csv\
                    .join(sales_csv, "store")\
                    .withColumnRenamed("count(DISTINCT dept)", "num_depts")

stores_fg = stores_csv\
            .join(stores_depts_count, "store")

Create store_fg feature group

Create a feature group named store_fg. The store is the primary key uniquely identifying all the remaining features in this feature group.

store_fg_meta = fs.create_feature_group(name="store_fg",
                                       description="Store related features",
                                       statistics_config={"enabled": True, "histograms": True, "correlations": True})

Up to this point we have just created the metadata object representing the feature group. However, we haven’t saved the feature group in the feature store yet. To do so, we can call the method save on the metadata object created in the cell above.

<hsfs.feature_group.FeatureGroup object at 0x7f3a442f2ed0>

Feature Engineering Sales

from pyspark.sql import Window
days = lambda i: i * 86400 

sales_df = sales_csv.withColumn('date', F.to_date("date", 'dd/MM/yyy'))\
                    .withColumn('timestamp', F.unix_timestamp("date"))

# Define aggregation window to compute sales performances over the past period of time
last_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-30), days(-1))
last_quarter_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-90), days(-1))
last_six_month_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-180), days(-1))
last_year_window_store_dep = Window.partitionBy(['store', 'dept']).orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-365), days(-1))

last_month_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-30), days(-1))
last_quarter_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-90), days(-1))
last_six_month_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-180), days(-1))
last_year_window_store = Window.partitionBy('store').orderBy(F.col("timestamp").cast("long")).rangeBetween(days(-365), days(-1))

# Build feature group dataframe
sales_fg = sales_df.withColumn("sales_last_month_store_dep", F.sum("weekly_sales").over(last_month_window_store_dep))\
        .withColumn("sales_last_quarter_store_dep", F.sum("weekly_sales").over(last_quarter_window_store_dep))\
        .withColumn("sales_last_six_month_store_dep", F.sum("weekly_sales").over(last_six_month_window_store_dep))\
        .withColumn("sales_last_year_store_dep", F.sum("weekly_sales").over(last_year_window_store_dep))\
        .withColumn("sales_last_month_store", F.sum("weekly_sales").over(last_month_window_store))\
        .withColumn("sales_last_quarter_store", F.sum("weekly_sales").over(last_quarter_window_store))\
        .withColumn("sales_last_six_month_store", F.sum("weekly_sales").over(last_six_month_window_store))\
        .withColumn("sales_last_year_store", F.sum("weekly_sales").over(last_year_window_store))\

Create sales_fg feature group

Differently from the store_fg, for the sales_fg we are going to define a composite primary key. This means that each entry in the sales_fg is going to be uniquely identified by the store, the department and the week. In this case we are going to specify also a partition key. Partitioning is a tool available at your disposal to improve the performances of querying a feature group.

sales_fg_meta = fs.create_feature_group(name="sales_fg",
                                        primary_key=['store', 'dept', 'date'],
                                        description="Sales related features",
<hsfs.feature_group.FeatureGroup object at 0x7f3a43b712d0>

When creating a feature group we can also specify a partition key. Partition keys help organize the feature data on the file system and improve perfomances when reading the feature group data. As for the primary key, also partition key can be a composite one.

sales_part_fg_meta = fs.create_feature_group(name="sales_fg",
                                        description="Sales related features",
<hsfs.feature_group.FeatureGroup object at 0x7f3a442f2890>

You can enable a feature group to be online by setting the online_enabled flag to true.

By default HSFS configures the feature group such that new feature data that gets saved or inserted is written to the offline feature store. If online_enabled=True, additionally, the data is saved to the online storage of the feature store. Note that the insert and save to both storages is not transactional.

If you want to create a purely online feature group. Save the feature group with online_enabled=True but with an empty dataframe and subsequently use the insert with storage="online" to overwrite the default and write to the online feature store only.

sales_part_fg_meta = fs.create_feature_group(name="sales_fg",
                                        primary_key=['store', 'dept', 'date'],
                                        description="Sales related features",

Feature Engineering Exogenous features

This feature group will contain exogenous features that can influence sales, but are not under the control of the distribution chain. These are the unemployment, the consumer price index (cpi) and so on. We are going to write these features as they are in the feature store

exogenous_fg = exogenous_csv.withColumn('date', F.to_date("date", 'dd/MM/yyy'))

exogenous_fg_meta = fs.create_feature_group(name="exogenous_fg",
                                            primary_key=['store', 'date'],
                                            description="External features that influence sales, but are not under the control of the distribution chain",
                                            statistics_config={"enabled": True, "histograms": True, "correlations": True})

Append additional data

You can add additional data to a feature group by calling the insert method. In the example below we assume that we got also the data for 2013 and we are going to append it to the existing exogenous_fg.

exogenous_fg_2013 = exogenous_fg.withColumn('date', F.date_add('date', 365))
exogenous_fg_meta = fs.get_feature_group('exogenous_fg', 1)

This will also recompute statistics after inserting new data. The new statistics will be saved along the metadata with a new commit time.

Append an additional feature

Appending features to a feature group is a non-breaking schema change compared to removing features, which will require creating a new version of the feature group.

You can append a feature group by specifying a data type and default value for the new feature. The default value is necessary for the data that is already in the feature group.

from hsfs.feature import Feature
exogenous_fg_meta.append_features([Feature("appended_feature", type="double", default_value="10.0")])

Delete a feature group

You can call the delete method on a feature group to delete the entire feature group.

exogenous_fg_meta = fs.create_feature_group(name="exogenous_fg",
                                        primary_key=['store', 'date'],
                                        description="External features that influence sales, but are not under the control of the distribution chain",
<hsfs.feature_group.FeatureGroup object at 0x7ff4e1fe13d0>
exogenous_fg_meta = fs.get_feature_group('exogenous_fg', 2)