Create Training Data from Features

HSFS training datasets

Training datasets is the third building block of the Hopsworks Feature Store. Data scientists can query the feature store (see feature_exploration notebook) and materialize their query in training datasets.

Training datasets can be saved in a ML framework friendly format (eg. TfRecords, CSV, Numpy) and then be fed to a machine learning model for training.

Training datasets can also be stored on external storage systems like Amazon S3 or GCS to be read by external model training platforms.

As with the previous notebooks, the first step is to establish a connection with the Hopsworks feature store and get the feature store handle

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
1application_1612782748969_0003pysparkidleLinkLink
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Create a training dataset from a query

In the previous notebook (feature_exploration) we walked through how to explore and query the Hopsworks feature store using HSFS. We can use the queries produced in the previous notebook to create a training dataset.

sales_fg = fs.get_feature_group('sales_fg')
exogenous_fg = fs.get_feature_group('exogenous_fg')

query = sales_fg.select_all()\
        .join(exogenous_fg.select(['fuel_price', 'unemployment', 'cpi']))
VersionWarning: No version provided for getting feature group `sales_fg`, defaulting to `1`.
VersionWarning: No version provided for getting feature group `exogenous_fg`, defaulting to `1`.

As for the feature groups, we first need to generate a metadata object representing the training dataset. After that, we can call the save() method to persist it in the Hopsworks feature store. Different file formats are available: csv, tfrecord, npy, hdf5, avro, parquet, orc.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               version=1)

td.save(query)
<hsfs.training_dataset.TrainingDataset object at 0x7f94701b38d0>

Pass write options

When you save a training dataset, you have the possibility of specifying additional parameters to the Spark writer. For instance, in the example below, we are adding the headers to the CSV file.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               version=2)

td.save(query, {'hearder': 'true'})
<hsfs.training_dataset.TrainingDataset object at 0x7f948851b610>

Split the training dataset

If you are training a model, you might want to split the training datasets into different slices (training, test and validation). HSFS allows you to specify the split sizes. You can also provide a seed for the random splitter, if you want to reproduce a training dataset.

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},
                               version=3)

td.save(query, {'hearder': 'true'})
<hsfs.training_dataset.TrainingDataset object at 0x7f9488528510>

Save the dataset on an external storage system

If you are training your model on an external machine learning platform (e.g. SageMaker), you might want to save the training dataset on an external storage system (e.g. S3). You can take advantage of the Hopsworks storage connectors (see documentation).

Assuming you have created an S3 storage connector name td_bucket_connector, you can create an external training dataset as follows:

td_bucket_connector = fs.get_storage_connector("td_bucket_connector")

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               storage_connector=td_bucket_connector,
                               version=4)

### This code is expected to fail if you connector is not configured properly
td.save(query)

Replay the query that generated the training dataset

If you created a training dataset from a query object, then you can ask the feature store to return the set of features (in order) and the set of joins that generated. This feature is useful if you are serving a model in production and you want to augment the inference vector with features taken from the online feature store

td = fs.get_training_dataset(name="sales_model")
print(td.query)
SELECT `fg0`.`sales_last_quarter_store_dep`, `fg0`.`store`, `fg0`.`sales_last_month_store_dep`, `fg0`.`sales_last_year_store`, `fg0`.`sales_last_quarter_store`, `fg0`.`sales_last_six_month_store_dep`, `fg0`.`sales_last_year_store_dep`, `fg0`.`sales_last_month_store`, `fg0`.`dept`, `fg0`.`sales_last_six_month_store`, `fg0`.`weekly_sales`, `fg0`.`is_holiday`, `fg0`.`date`, `fg1`.`fuel_price`, `fg1`.`unemployment`, `fg1`.`cpi`
FROM `demo_fs_meb10000`.`sales_fg_1` `fg0`
INNER JOIN `demo_fs_meb10000`.`exogenous_fg_1` `fg1` ON `fg0`.`date` = `fg1`.`date` AND `fg0`.`store` = `fg1`.`store`
VersionWarning: No version provided for getting training dataset `sales_model`, defaulting to `1`.

Create a training dataset from a DataFrame

If you need to apply additional transformations before creating a training dataset, you can create one from a Spark DataFrame instead of using a query. The create_training_dataset part stays the same, the difference is that we are going to pass a DataFrame to the save() method.

As you have applied additional transformations between the query object and the training dataset, we won’t be able to re-play the query for this specific training dataset.

from pyspark.sql import functions as F
df = query.read()
# Apply additional transformations
df = (df.withColumn("is_holiday", F.when(F.col("is_holiday") == "true", 1 ).otherwise(0))
       .withColumn("unemployment", F.col("unemployment").cast("double")) 
       .withColumn("cpi", F.col("cpi").cast("double"))
       .drop("date"))

save as csv format

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="csv",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},                                
                               version=5)

td.save(df)
<hsfs.training_dataset.TrainingDataset object at 0x7f94884e0150>

save as tfrecord format

td = fs.create_training_dataset(name="sales_model",
                               description="Dataset to train the sales model",
                               data_format="tfrecord",
                               splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},                                
                               version=6)

td.save(df)
<hsfs.training_dataset.TrainingDataset object at 0x7f948849f210>

Read a training dataset

As for feature groups, you can call the methods show() method to get a preview of the training dataset and read() to get a Spark DataFrame of it.

td = fs.get_training_dataset("sales_model", 2)
td.show(5)
+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+
|sales_last_quarter_store_dep|store|sales_last_month_store_dep|sales_last_year_store|sales_last_quarter_store|sales_last_six_month_store_dep|sales_last_year_store_dep|sales_last_month_store|dept|sales_last_six_month_store|weekly_sales|is_holiday|               date|fuel_price|unemployment|        cpi|
+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+
|                         0.0|   20|                       0.0|                  0.0|                     0.0|                           0.0|                      0.0|                   0.0|  55|                       0.0|    32362.95|     false|2010-02-05 00:00:00|     2.784|       8.187|204.2471935|
|                         0.0|   20|                       0.0|                  0.0|                     0.0|                           0.0|                      0.0|                   0.0|  94|                       0.0|    63787.83|     false|2010-02-05 00:00:00|     2.784|       8.187|204.2471935|
|                         0.0|   20|                       0.0|                  0.0|                     0.0|                           0.0|                      0.0|                   0.0|  22|                       0.0|    17597.83|     false|2010-02-05 00:00:00|     2.784|       8.187|204.2471935|
|                         0.0|   20|                       0.0|                  0.0|                     0.0|                           0.0|                      0.0|                   0.0|  30|                       0.0|     9488.37|     false|2010-02-05 00:00:00|     2.784|       8.187|204.2471935|
|                         0.0|   20|                       0.0|                  0.0|                     0.0|                           0.0|                      0.0|                   0.0|   2|                       0.0|    85812.69|     false|2010-02-05 00:00:00|     2.784|       8.187|204.2471935|
+----------------------------+-----+--------------------------+---------------------+------------------------+------------------------------+-------------------------+----------------------+----+--------------------------+------------+----------+-------------------+----------+------------+-----------+
only showing top 5 rows

If you have splitted your training dataset, you can also read a single split

td = fs.get_training_dataset("sales_model", 6)
td.read("train").count()
295125

Input the training dataset to a model training loop

If you are training a model, HSFS provides tf_data method that returns TFDataEngine object with utility methods to read training dataset as tf.data.Dataset object to read the training dataset and feed it to a model training loop efficiently. * Currently TFDataEngine provides 2 utility methods tf_record_dataset and tf_csv_dataset for reading .tfrecord and .csv files, respectivelly. * Both methods support only following feature types string, short, int, long, float and double. * In both methods you can set process argument to True and they will return PrefetchDataset ready to input to model training loop. * If you would like to apply your own logic to feature transformation using tf.data.Dataset then set process argument to False.

proccess using tf_record_dataset:

train_input = td.tf_data(target_name='weekly_sales', split='train', is_training=True)
train_input_processed = train_input.tf_record_dataset(process=True, batch_size =32, num_epochs=1)
train_input_processed
<PrefetchDataset shapes: ((32, 14), (32,)), types: (tf.float32, tf.float32)>

Apply custom logic to tf_record_dataset:

td = fs.get_training_dataset("sales_model", 6)

train_input = td.tf_data(target_name=None, split='train', is_training=True)
train_input_not_processed = train_input.tf_record_dataset()
train_input_not_processed
<ParallelMapDataset shapes: {cpi: (), dept: (), fuel_price: (), is_holiday: (), sales_last_month_store: (), sales_last_month_store_dep: (), sales_last_quarter_store: (), sales_last_quarter_store_dep: (), sales_last_six_month_store: (), sales_last_six_month_store_dep: (), sales_last_year_store: (), sales_last_year_store_dep: (), store: (), unemployment: (), weekly_sales: ()}, types: {cpi: tf.float32, dept: tf.int64, fuel_price: tf.float32, is_holiday: tf.int64, sales_last_month_store: tf.float32, sales_last_month_store_dep: tf.float32, sales_last_quarter_store: tf.float32, sales_last_quarter_store_dep: tf.float32, sales_last_six_month_store: tf.float32, sales_last_six_month_store_dep: tf.float32, sales_last_year_store: tf.float32, sales_last_year_store_dep: tf.float32, store: tf.int64, unemployment: tf.float32, weekly_sales: tf.float32}>
import tensorflow as tf

batch_size = 32
num_epochs = 1 

def custom_impl(example):
    feature_names = [td_feature.name for td_feature in td.schema] 
    label_name = feature_names.pop(feature_names.index('weekly_sales'))
    x = [tf.cast(example[feature_name], tf.float32) for feature_name in feature_names]
    y = example[label_name]
    return x,y

train_input_custum_processed = train_input_not_processed.map(lambda value: custom_impl(value))\
    .shuffle(num_epochs * batch_size)\
    .repeat(num_epochs * batch_size)\
    .cache()\
    .batch(batch_size, drop_remainder=True)\
    .prefetch(tf.data.experimental.AUTOTUNE)
train_input_custum_processed
<PrefetchDataset shapes: ((32, 14), (32,)), types: (tf.float32, tf.float32)>