Built-In Online Transformation Functions

Built-In Online Transformation Functions

Create connection to hsfs

import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Starting Spark application
IDApplication IDKindStateSpark UIDriver log
0application_1645116533784_0002pysparkidleLinkLink
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Generate HSFS Built-In Online Transformations

HSFS comes with built-in transformation functions such as min_max_scaler, standard_scaler, robust_scaler and label_encoder. These functions are registered in the feature store of every project.

Get All Online Transformations Available in the Feature Store

fs.get_transformation_functions()
[<hsfs.transformation_function.TransformationFunction object at 0x7fb6adf82d30>, <hsfs.transformation_function.TransformationFunction object at 0x7fb6adf82fa0>, <hsfs.transformation_function.TransformationFunction object at 0x7fb6adf82f40>, <hsfs.transformation_function.TransformationFunction object at 0x7fb6adb080a0>]

Get Online Transformation Function by Name and Version

min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
print(min_max_scaler.name)
print(min_max_scaler.version)
min_max_scaler
1
standard_scaler = fs.get_transformation_function(name="standard_scaler")
print(standard_scaler.name)
print(standard_scaler.version)
standard_scaler
1
robust_scaler = fs.get_transformation_function(name="robust_scaler")
print(robust_scaler.name)
print(robust_scaler.version)
robust_scaler
1
label_encoder = fs.get_transformation_function(name="label_encoder")
print(label_encoder.name)
print(label_encoder.version)
label_encoder
1

View Built-In Online Transformation Source Code

print(min_max_scaler.transformer_code)
# Min-Max scaling
def min_max_scaler(value, min_value,max_value):
    return (value - min_value) / (max_value - min_value)
print(standard_scaler.transformer_code)
# Standardization / zcore
def standard_scaler(value, mean, std_dev):
    return (value - mean) / std_dev
print(robust_scaler.transformer_code)
# Robust scaling
def robust_scaler(value, p25, p50, p75):
    return (value - p50) / (p75 - p25)
print(label_encoder.transformer_code)
# label encoder
def label_encoder(value, value_to_index):
    # define a mapping of values to integers
    return value_to_index[value]

Create Training Dataset With Online Transformation

To use online transformation function for training dataset it must be created from an HSFS Query object.

Here we assume that the feature group economy_fg version 2 is already created. Otherwise please run notebook time_travel/time_travel_python.ipynb first.

economy_fg = fs.get_feature_group('economy_fg',2)
demography_fg = fs.get_feature_group('demography_fg',2)
economy_fg.read().show()
+----+---+---------+----------+-----+--------+------+---------+
|year| id|   salary|commission|  car|  hvalue|hyears|     loan|
+----+---+---------+----------+-----+--------+------+---------+
|2020|  1|120499.73|       0.0|car17|205000.0|    30| 564724.2|
|2020|  2|160893.77|       0.0|car10|179000.0|     2|455015.34|
|2020|  3|119159.65|       0.0| car1|145000.0|    22|122025.08|
|2020|  4|  20000.0|  52593.63| car9|185000.0|    30| 99629.62|
|2020|  5| 93956.32|       0.0|car15|135000.0|     1| 458679.8|
|2020|  6| 41365.43|  52809.15| car7|135000.0|    19| 216839.7|
|2020|  7| 94805.61|       0.0|car17|135000.0|    23|233216.06|
|2020|  8| 64410.62|  39884.39|car20|125000.0|     6|350707.38|
|2020|  9|128298.82|       0.0|car19|135000.0|    12| 20768.06|
|2020| 10|100806.92|       0.0| car8|135000.0|     6|293106.66|
+----+---+---------+----------+-----+--------+------+---------+
economy_fg.read().printSchema()
root
 |-- year: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- commission: float (nullable = true)
 |-- car: string (nullable = true)
 |-- hvalue: float (nullable = true)
 |-- hyears: integer (nullable = true)
 |-- loan: float (nullable = true)

Create the training dataset from from an HSFS Query object

query = demography_fg.select(['age','elevel','zipcode']).join(economy_fg.select_all())
query.show(5)
+---+------+--------+----+---+---------+----------+-----+--------+------+---------+
|age|elevel| zipcode|year| id|   salary|commission|  car|  hvalue|hyears|     loan|
+---+------+--------+----+---+---------+----------+-----+--------+------+---------+
| 54|level3|zipcode5|2020|  1|120499.73|       0.0|car17|205000.0|    30| 564724.2|
| 71|level2|zipcode3|2020|  6| 41365.43|  52809.15| car7|135000.0|    19| 216839.7|
| 49|level2|zipcode4|2020|  3|119159.65|       0.0| car1|145000.0|    22|122025.08|
| 59|level1|zipcode2|2020|  5| 93956.32|       0.0|car15|135000.0|     1| 458679.8|
| 32|level1|zipcode3|2020|  9|128298.82|       0.0|car19|135000.0|    12| 20768.06|
+---+------+--------+----+---+---------+----------+-----+--------+------+---------+
only showing top 5 rows

Provide transformation functions as dict, where key is feature name and value is the name of the online transformation function

td = fs.create_training_dataset(name="economy_td",
                               description="Dataset to train the some model",
                               data_format="csv",
                               transformation_functions={"hyears":min_max_scaler, 
                                                         "loan":standard_scaler,
                                                         "salary":robust_scaler,
                                                         "elevel":label_encoder},
                               version=1)
td.save(query)

Online tranformation functions are now attached to training dataset as medadata and contain information to which feature groups they will be applied

td = fs.get_training_dataset("economy_td")
VersionWarning: No version provided for getting training dataset `economy_td`, defaulting to `1`.
td.transformation_functions
{'salary': <hsfs.transformation_function.TransformationFunction object at 0x7fb6adb08790>, 'hyears': <hsfs.transformation_function.TransformationFunction object at 0x7fb6adb0ffd0>, 'elevel': <hsfs.transformation_function.TransformationFunction object at 0x7fb6adb08880>, 'loan': <hsfs.transformation_function.TransformationFunction object at 0x7fb6adb0f700>}
td.read().show()
+---+------+--------+----+---+-------------------+----------+-----+--------+--------------------+--------------------+
|age|elevel| zipcode|year| id|             salary|commission|  car|  hvalue|              hyears|                loan|
+---+------+--------+----+---+-------------------+----------+-----+--------+--------------------+--------------------+
| 71|     1|zipcode3|2020|  6|-1.0597689460499202|  52809.15| car7|135000.0|  0.6206896551724138|-0.38649341373235707|
| 33|     1|zipcode1|2020|  8|-0.6489014306268116|  39884.39|car20|125000.0|  0.1724137931034483| 0.41402930784560865|
| 32|     3|zipcode3|2020|  9|0.49014685129569574|       0.0|car19|135000.0|  0.3793103448275862| -1.5589931124457643|
| 32|     3|zipcode2|2020|  7|-0.1069960458425144|       0.0|car17|135000.0|  0.7586206896551724| -0.2885635133105803|
| 44|     3|zipcode8|2020|  2| 1.0712747733778236|       0.0|car10|179000.0|0.034482758620689655|  1.0377863199492405|
| 49|     1|zipcode4|2020|  3|0.32720661898815995|       0.0| car1|145000.0|  0.7241379310344828| -0.9534806526026023|
| 56|     0|zipcode2|2020|  4|-1.4406883689742835|  52593.63| car9|185000.0|                 1.0|  -1.087404512557536|
| 54|     2|zipcode5|2020|  1|0.35109856937318834|       0.0|car17|205000.0|                 1.0|  1.6938403242493119|
| 59|     3|zipcode2|2020|  5|-0.1221378288216204|       0.0|car15|135000.0|                 0.0|  1.0596996800293843|
| 58|     1|zipcode5|2020| 10|                0.0|       0.0| car8|135000.0|  0.1724137931034483| 0.06957957257529504|
+---+------+--------+----+---+-------------------+----------+-----+--------+--------------------+--------------------+

Transformation functions will also be applied to feature vectors retrieved by get_serving_vector method

td_meta = fs.get_training_dataset("economy_td", 1)
#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself
td_meta.init_prepared_statement() 
td_meta.serving_keys
{'id'}
td_meta.get_serving_vector({'id': 1})
[54, 2, 'zipcode5', 2020, 1, 0.3511034444286508, 0.0, 'car17', 205000.0, 1.0, 1.6938392030076546]

Training dataset With Splits

When creating a training dataset with multiple random splits, the user needs to tell HSFS which split is going to be used for training by supplying the train_split argument. The statistics such as min/max/std/mean of this split will be used for the online transformations.

td_with_splits = fs.create_training_dataset(name="economy_td",
                                            description="Dataset to train the some model",
                                            data_format="csv",
                                            transformation_functions={"hyears":min_max_scaler, 
                                                                      "loan":standard_scaler,
                                                                      "salary":robust_scaler},
                                            splits={'train': 0.7, 'test': 0.2, 'validate': 0.1},
                                            train_split = 'train',            
                                            version=2)
td_with_splits.splits
{'train': 0.7, 'test': 0.2, 'validate': 0.1}
td_with_splits.save(query)
td_meta = fs.get_training_dataset("economy_td", 2)
td_meta.serving_keys
td_meta.get_serving_vector({'id': 2})
[44, 'level1', 'zipcode8', 2020, 2, 1.83460249027205, 0.0, 'car10', 179000.0, 0.034482758620689655, 1.4928188879390454]