Snowflake Integration PySpark

Snowflake Integration

Snowflake is a popular cloud-native data warehouse service, and supports scalable feature computation with SQL. However, Snowflake is not viable as an online feature store that serves features to models in production - columnar databases have too high latency compared to OLTP databases or key-value stores.

In this blog post, we show you how to connect Hopsworks to Snowflake and leverage its data and computation capabilities to create features and make them available both offline in Snowflake and online in Hopsworks.

Before you get started, make sure you have created a free account on Hopsworks.ai. Hopsworks.ai gives access to a managed cloud based deployment of the Hopsworks Feature Store, with free Hops credits to use.

Storage Connector

The first step to integrate Hopsworks with an external Snowflake cluster is to configure the Snowflake storage connector. Hopsworks provides storage connectors to securely centralize and manage connection configurations and credentials to interact with external data stores. In this way users do not have to hardcode passwords and tokens in programs, and you can control which users are given access to external data stores.

Hopsworks provides a storage connector and drivers for Snowflake. The storage connector can be configured using the feature store UI as illustrated below:

To configure the connector you need to provide the Connection URL of your cluster. The Snowflake storage connector supports both username and password authentication as well as token-based authentication. Token-based authentication is required when using OAuth to authenticate with Snowflake. To be able to use token-based authentication from Hopsworks, you will have to enable it on your Snowflake cluster, as explained in the Snowflake documentation: https://docs.snowflake.com/en/user-guide/spark-connector-use.html#using-external-oauth

The Hopsworks Snowflake storage connector allows users to specify several additional fields, though only two are mandatory: the database field and the schema field.

The role field can be used to specify which Snowflake security role (https://docs.snowflake.com/en/user-guide/security-access-control-overview.html#system-defined-roles ) to assume for the session after the connection is established.

The application field can also be specified to have better observability in Snowflake with regards to which application is running which query. The application field can be a simple string like “Hopsworks” or, for instance, the project name, to track usage and queries from each Hopsworks project.

Additional key/value options can also be specified to control the behaviour of the Snowflake Spark connector. The available options are listed in the Snowflake documentation: https://docs.snowflake.com/en/user-guide/spark-connector-use.html

import hsfs
# Connect to the Hopsworks feature store
connection = hsfs.connection()
# Retrieve the metadata handle
fs = connection.get_feature_store()

On-demand (External) Feature Groups

Once the storage connector is configured, users can start defining on-demand feature groups. On-demand feature groups are external tables that can be defined on external SQL databases or tables stored on object stores (such as Parquet or Delta lake tables on S3 object storage). The data for an on-demand feature group is not copied into Hopsworks. Instead, it is stored in-place in the external data store, and is only read from the external store “on-demand”, for example, when its feature data is used to create training data or for batch inference. While data remains on the external store, the feature group metadata is stored in Hopsworks. More in-depth documentation for on-demand feature groups can be found here: https://docs.hopsworks.ai/latest/generated/on_demand_feature_group/.

On-demand feature groups can be used in combination with cached feature groups to either create training datasets or to retrieve large volumes of feature data for batch inference.

An example of on-demand query definition is given below:

# Retrieve the storage connector defined before
snowflake_connector = fs.get_storage_connector("snowflake_demo")
query = """
    SELECT TO_NUMERIC(ss_store_sk) AS ss_store_sk
        , AVG(ss_net_profit) AS avg_ss_net_profit
        , SUM(ss_net_profit) AS total_ss_net_profit 
        , AVG(ss_list_price) AS avg_ss_list_price
        , AVG(ss_coupon_amt) AS avg_ss_coupon_amt
    FROM STORE_SALES
    GROUP BY ss_store_sk
"""

store_sales_on_dmd = fs.create_on_demand_feature_group(name="store_sales_features",
                                                        version=1,
                                                        query=query,
                                                        storage_connector=snowflake_connector,
                                                        statistics_config={"enabled": True, "histograms": True, "correlations": True})
store_sales_on_dmd.save()
query = """
    SELECT s_store_id
        , s_store_sk
        , s_floor_space
        , s_number_employees
        , s_hours
        , s_city
        , s_state
        , s_tax_precentage  
    FROM STORE
"""

stores_on_dmd = fs.create_on_demand_feature_group(name="store_features",
                                                version=1,
                                                query=query,
                                                storage_connector=snowflake_connector,
                                                statistics_config=False)
stores_on_dmd.save()

Training dataset

Training datasets are a concept of the Hopsworks Feature Store that allows data scientists to pick features from different feature groups, join them together and get the data in a ML framework friendly file format that can be used to train models in TensorFlow (.tfrecord), Pytorch (.npy), SkLearn (.csv), and others.

The training dataset concept provides additional benefits, such as having a snapshot of the data at a particular point in time, being able to compute statistics on that snapshot and compare them with the statistics on the incoming data being submitted for predictions.

You can create a training dataset either using the HSFS library or using the user interface . In the example below, we join features from two different on-demand feature groups defined over the Snowflake database. It is also possible to join features from cached feature groups as well.

The training dataset in the example below is stored in TFRecords format ready to be used by a TensorFlow or Keras model. The training dataset itself can be stored on external stores like S3 or ADLS, as explained in the documentation: https://docs.hopsworks.ai/latest/generated/training_dataset/

query = store_sales_on_dmd.select_except(["ss_store_sk"])\
    .join(stores_on_dmd.select_except(["s_store_sk", "s_store_id"]), left_on=["ss_store_sk"], right_on=["s_store_sk"])
td = fs.create_training_dataset("sales_forecast_model",
                               description="Training dataset example",
                               version=2,
                               label=['total_ss_net_profit'],
                               data_format="csv",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},
                               statistics_config={"enabled": True, "histograms": True, "correlations": True})

td.save(query)

Online serving

After having trained the model and put it into production, users need to leverage the online feature store to provide the features required to make predictions.

Snowflake is a columnar database, not designed to provide low latency access to data. To satisfy the latency requirements that a production system requires, we make the feature data available online using RonDB. RonDB is the database powering the Hopsworks online feature store.

To make the data available online, we need to create a cached feature group and set the online_enabled flag to True. For online storage, we don’t need to track data statistics. The online feature group will contain only the most recent values of each entity (e.g., the last feature vector available for a given store_id or customer_id).

The below example makes the data available only at a specific point in time (i.e., the moment the online feature group was created). In Hopsworks, users can define a Spark/Python program that periodically refreshes the data according to the requirement of the specific use case. Hence making the most recent data always available online.

online_fg = fs.create_feature_group("store_sales_features_online",
                                   version=1,
                                   description="Store sales features availaable online",
                                   primary_key=['ss_store_sk'],
                                   online_enabled=True,
                                   time_travel_format=None,
                                   statistics_config = False)

# Create just the metadata
online_fg.save(store_sales_on_dmd.read().limit(0))
# Insert into the online feature store
online_fg.insert(store_sales_on_dmd.read(), storage="online")