Online transformation functions

Create connection to hsfs

import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store();
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
9application_1621853749763_0010pysparkidleLinkLink
SparkSession available as 'spark'.
Connected. Call `.close()` to terminate connection gracefully.

Define online transformation

To be able to attach transformation function to training datasets it has to be part of library installed in Hopsworks. Please refer to documentation how to install python libraries in Hopsworks. When defining transformation function don’t decorate with spark @udf or @pandas_udfs, as well as don’t use any spark dependencies. HSFS will decorate transformation function only if it is used inside spark application.

from hsfs_transformers import transformers
plus_one_float_meta = fs.create_transformation_function(transformation_function=transformers.plus_one, 
                                                        output_type=float, 
                                                        version=1)
plus_one_float_meta.save()
plus_one_int_meta = fs.create_transformation_function(transformation_function=transformers.plus_one, 
                                                      output_type=int, 
                                                      version=2)
plus_one_int_meta.save()
plus_one_double_meta = fs.create_transformation_function(transformation_function=transformers.plus_one, 
                                                    output_type="double", version=3)
plus_one_double_meta.save()
date_string_2_timestamp_meta = fs.create_transformation_function(
                                            transformation_function=transformers.date_string_to_timestamp,
                                            output_type="long", version=1)
date_string_2_timestamp_meta.save()
print(plus_one_float_meta.name)
print(plus_one_int_meta.name)
print(date_string_2_timestamp_meta.name)
plus_one
plus_one
date_string_to_timestamp

Get all online transformations available in the feature store

fs.get_transformation_functions()
[<hsfs.transformation_function.TransformationFunction object at 0x7fa7d8563a10>, <hsfs.transformation_function.TransformationFunction object at 0x7fa7d8563110>, <hsfs.transformation_function.TransformationFunction object at 0x7fa7d8563750>, <hsfs.transformation_function.TransformationFunction object at 0x7fa7d8563990>]

Get online transformation by name and version

plus_one_meta = fs.get_transformation_function(name="plus_one")
print(plus_one_meta.name)
print(plus_one_meta.version)
plus_one
1
plus_one_float_meta = fs.get_transformation_function(name="plus_one", version=1)
print(plus_one_float_meta.name)
print(plus_one_float_meta.version)
plus_one
1
plus_one_int_meta = fs.get_transformation_function(name="plus_one", version=2)
print(plus_one_int_meta.name)
print(plus_one_int_meta.version)
plus_one
2
date_string_2_timestamp_meta = fs.get_transformation_function(name="date_string_to_timestamp", version=1)
print(date_string_2_timestamp_meta.name)
print(date_string_2_timestamp_meta.version)
date_string_to_timestamp
1

View online transformation source code

Since we are using pyspark kernel hsfs will add udf decorator
print(plus_one_float_meta.transformer_code)
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

@udf(FloatType())
def plus_one(value):
    return value + 1
print(plus_one_int_meta.transformer_code)
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

@udf(IntegerType())
def plus_one(value):
    return value + 1
print(date_string_2_timestamp_meta.transformer_code)
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

@udf(LongType())
def date_string_to_timestamp(input_date):
    date_format = "%Y%m%d%H%M%S"
    return int(float(datetime.strptime(input_date, date_format).timestamp()) * 1000)

Delete transformation function

plus_one_double_meta = fs.get_transformation_function(name="plus_one", version=3)
plus_one_double_meta.delete()

Create training dataset with online transformation

To use online transoformation function for training dataset it must be created from hsfs Query object.

economy_fg = fs.get_feature_group('economy_fg',2)
demography_fg = fs.get_feature_group('demography_fg',2)
economy_fg.read().show()
+---+---------+----------+-----+--------+------+---------+----+
| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+---------+----------+-----+--------+------+---------+----+
|  1|110499.73|       0.0|car15|235000.0|    30| 354724.2|2020|
|  2|140893.77|       0.0|car20|135000.0|     2|395015.34|2020|
|  3|119159.65|       0.0| car1|145000.0|    22|122025.08|2020|
|  4|  20000.0|  52593.63| car9|185000.0|    30| 99629.62|2020|
+---+---------+----------+-----+--------+------+---------+----+
economy_fg.read().printSchema()
root
 |-- id: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- commission: float (nullable = true)
 |-- car: string (nullable = true)
 |-- hvalue: float (nullable = true)
 |-- hyears: integer (nullable = true)
 |-- loan: float (nullable = true)
 |-- year: integer (nullable = true)

Training dataset needs to be created from hsfs Query object

query = demography_fg.select(['age','elevel','zipcode']).join(economy_fg.select_all())

Provide transformation functions as dict, where key is feature name and value is online transformation function name

td = fs.create_training_dataset(name="economy_td",
                               description="Dataset to train the some model",
                               data_format="csv",
                               transformation_functions={"hyears":plus_one_int_meta, 
                                                         "loan":plus_one_float_meta},
                               statistics_config=None, 
                               version=1)
td.save(query)
<hsfs.training_dataset.TrainingDataset object at 0x7fa7d84f6050>

Online tranformation functions are now attached to training dataset as medadata and contain information to which feature groups they will be applied

td = fs.get_training_dataset("economy_td")
VersionWarning: No version provided for getting training dataset `economy_td`, defaulting to `1`.
td.transformation_functions
{'hyears': <hsfs.transformation_function.TransformationFunction object at 0x7fa7d849b690>, 'loan': <hsfs.transformation_function.TransformationFunction object at 0x7fa7d849bc50>}
td.read().show()
+---+------+--------+---+---------+----------+-----+--------+------+---------+----+
|age|elevel| zipcode| id|   salary|commission|  car|  hvalue|hyears|     loan|year|
+---+------+--------+---+---------+----------+-----+--------+------+---------+----+
| 56|level0|zipcode2|  4|  20000.0|  52593.63| car9|185000.0|    31| 99630.62|2020|
| 54|level3|zipcode5|  1|110499.73|       0.0|car15|235000.0|    31| 354725.2|2020|
| 49|level2|zipcode4|  3|119159.65|       0.0| car1|145000.0|    23|122026.08|2020|
| 44|level4|zipcode8|  2|140893.77|       0.0|car20|135000.0|     3|395016.34|2020|
+---+------+--------+---+---------+----------+-----+--------+------+---------+----+

transformation functions will be also applied to feature vectores retrieved by get_serving_vector method

td_meta = fs.get_training_dataset("economy_td", 1)
#`init_prepared_statement` method is needed to get serving_keys in case `get_serving_vector` has not beed called yet. This is not necessary for `get_serving_vector` method itself
td_meta.init_prepared_statement() 
td_meta.serving_keys
{'id'}
td_meta.get_serving_vector({'id': 1})
[54, 'level3', 'zipcode5', 1, 110500.0, 0.0, 'car15', 235000.0, 31, 354725.0, 2020]