2. Create Training Data from Features

HSFS training datasets

looking!!! feature_exploration Training datasets is the third building block of the Hopsworks Feature Store. Data scientists can query the feature store (see feature_exploration notebook) and materialize their query in training datasets.

Training datasets can be saved in a ML framework friendly format (eg. TfRecords, CSV, Numpy) and then be fed to a machine learning model for training.

Training datasets can also be stored on external storage systems like Amazon S3 or GCS to be read by external model training platforms.

As with the previous notebooks, the first step is to establish a connection with the Hopsworks feature store and get the feature store handle

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Starting Spark application
IDApplication IDKindStateSpark UIDriver log
2application_1643314752454_0004pysparkidleLinkLink
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Create a training dataset from a query

looking!!! feature_exploration In the previous notebook (feature_exploration) we walked through how to explore and query the Hopsworks feature store using HSFS. We can use the queries produced in the previous notebook to create a training dataset.

sales_fg = fs.get_feature_group('sales_fg')
exogenous_fg = fs.get_feature_group('exogenous_fg')

query = sales_fg.select_all()\
        .join(exogenous_fg.select(['fuel_price', 'unemployment', 'cpi']))
VersionWarning: No version provided for getting feature group `sales_fg`, defaulting to `1`.
VersionWarning: No version provided for getting feature group `exogenous_fg`, defaulting to `1`.

As for the feature groups, we first need to generate a metadata object representing the training dataset. After that, we can call the save() method to persist it in the Hopsworks feature store. Different file formats are available: csv, tfrecord, npy, hdf5, avro, parquet, orc.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               version=1)

td.save(query)

Pass write options

When you save a training dataset, you have the possibility of specifying additional parameters to the Spark writer. For instance, in the example below, we are adding the headers to the CSV file.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               version=2)

td.save(query, {'header': 'true'})

Split the training dataset

If you are training a model, you might want to split the training datasets into different slices (training, test and validation). If splits is set, you must also provide the name of the split that is going to be used for training. The statistics of this split will be used for transformation functions if necessary.

HSFS allows you to specify the split sizes. You can also provide a seed for the random splitter, if you want to reproduce a training dataset.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},
                               train_split="train", 
                               version=3)

td.save(query, {'header': 'true'})

Save the dataset on an external storage system

If you are training your model on an external machine learning platform (e.g. SageMaker), you might want to save the training dataset on an external storage system (e.g. S3). You can take advantage of the Hopsworks storage connectors (see documentation).

Assuming you have created an S3 storage connector name td_bucket_connector, you can create an external training dataset as follows:

td_bucket_connector = fs.get_storage_connector("td_bucket_connector")

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               storage_connector=td_bucket_connector,
                               version=4)

### This code is expected to fail if you connector is not configured properly
td.save(query)

Replay the query that generated the training dataset

If you created a training dataset from a query object, then you can ask the feature store to return the set of features (in order) and the set of joins that generated. This feature is useful if you are serving a model in production and you want to augment the inference vector with features taken from the online feature store

td = fs.get_training_dataset(name="sales_model")
print(td.query)
SELECT `fg0`.`store`, `fg0`.`dept`, `fg0`.`date`, `fg0`.`weekly_sales`, `fg0`.`is_holiday`, `fg0`.`sales_last_month_store_dep`, `fg0`.`sales_last_quarter_store_dep`, `fg0`.`sales_last_six_month_store_dep`, `fg0`.`sales_last_year_store_dep`, `fg0`.`sales_last_month_store`, `fg0`.`sales_last_quarter_store`, `fg0`.`sales_last_six_month_store`, `fg0`.`sales_last_year_store`, `fg1`.`fuel_price`, `fg1`.`unemployment`, `fg1`.`cpi`
FROM `demo_fs_davit000`.`sales_fg_1` `fg0`
INNER JOIN `demo_fs_davit000`.`exogenous_fg_1` `fg1` ON `fg0`.`date` = `fg1`.`date` AND `fg0`.`store` = `fg1`.`store`
VersionWarning: No version provided for getting training dataset `sales_model`, defaulting to `1`.

Create a training dataset from a DataFrame

If you need to apply additional transformations before creating a training dataset, you can create one from a Spark DataFrame instead of using a query. The create_training_dataset part stays the same, the difference is that we are going to pass a DataFrame to the save() method.

As you have applied additional transformations between the query object and the training dataset, we won’t be able to re-play the query for this specific training dataset.

from pyspark.sql import functions as F
df = query.read()
# Apply additional transformations
df = (df.withColumn("is_holiday", F.when(F.col("is_holiday") == "true", 1 ).otherwise(0))
       .withColumn("unemployment", F.col("unemployment").cast("double")) 
       .withColumn("cpi", F.col("cpi").cast("double"))
       .drop("date"))

save as csv format

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},   
                               train_split="train",                                 
                               version=5)

td.save(df)

save as tfrecord format

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="tfrecord",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1}, 
                               train_split="train",                                 
                               version=6)

td.save(df)

Read a training dataset

As for feature groups, you can call the methods show() method to get a preview of the training dataset and read() to get a Spark DataFrame of it.

td = fs.get_training_dataset("sales_model", 2)
td.show(5)

If you have splitted your training dataset, you can also read a single split

td = fs.get_training_dataset("sales_model", 6)
td.read("train").count()

Input the training dataset to a model training loop

If you are training a model, HSFS provides tf_data method that returns TFDataEngine object with utility methods to read training dataset as tf.data.Dataset object to read the training dataset and feed it to a model training loop efficiently. * Currently TFDataEngine provides 2 utility methods tf_record_dataset and tf_csv_dataset for reading .tfrecord and .csv files, respectivelly. * Both methods support only following feature types string, short, int, long, float and double. * In both methods you can set process argument to True and they will return PrefetchDataset ready to input to model training loop. * If you would like to apply your own logic to feature transformation using tf.data.Dataset then set process argument to False.

proccess using tf_record_dataset:

train_input = td.tf_data(target_name='weekly_sales', split='train', is_training=True)
train_input_processed = train_input.tf_record_dataset(process=True, batch_size =32, num_epochs=1)
train_input_processed

Apply custom logic to tf_record_dataset:

td = fs.get_training_dataset("sales_model", 6)

train_input = td.tf_data(target_name=None, split='train', is_training=True)
train_input_not_processed = train_input.tf_record_dataset()
train_input_not_processed
import tensorflow as tf

batch_size = 32
num_epochs = 1 

def custom_impl(example):
    feature_names = [td_feature.name for td_feature in td.schema] 
    label_name = feature_names.pop(feature_names.index('weekly_sales'))
    x = [tf.cast(example[feature_name], tf.float32) for feature_name in feature_names]
    y = example[label_name]
    return x,y

train_input_custum_processed = train_input_not_processed.map(lambda value: custom_impl(value))\
    .shuffle(num_epochs * batch_size)\
    .repeat(num_epochs * batch_size)\
    .cache()\
    .batch(batch_size, drop_remainder=True)\
    .prefetch(tf.data.experimental.AUTOTUNE)
train_input_custum_processed