Do you want to try out this notebook? Get a free account (no credit-card reqd) at hopsworks.ai. You can also install open-source Hopsworks or view tutorial videos here.
1. Create empty feature groups in the Online Feature Store
Create empty feature groups
In this demo example we are expecting to recieve data from Kafka topic, read using spark streaming, do streamig aggregations and ingest aggregated data to feature groups. Thus we will create empy feature groups where we will ingest streaming data.
import json
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType, LongType, IntegerType
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
60 | application_1623853832952_0042 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
Define schema for feature groups
card_schema = StructType([StructField('tid', StringType(), True),
StructField('datetime', StringType(), True),
StructField('cc_num', LongType(), True),
StructField('amount', DoubleType(), True)])
schema_10m = StructType([StructField('cc_num', LongType(), True),
StructField('num_trans_per_10m', LongType(), True),
StructField('avg_amt_per_10m', DoubleType(), True),
StructField('stdev_amt_per_10m', DoubleType(), True)])
schema_1h = StructType([StructField('cc_num', LongType(), True),
StructField('num_trans_per_1h', LongType(), True),
StructField('avg_amt_per_1h', DoubleType(), True),
StructField('stdev_amt_per_1h', DoubleType(), True)])
schema_12h = StructType([StructField('cc_num', LongType(), True),
StructField('num_trans_per_12h', LongType(), True),
StructField('avg_amt_per_12h', DoubleType(), True),
StructField('stdev_amt_per_12h', DoubleType(), True)])
Create empty spark dataframes
empty_card_df = sqlContext.createDataFrame(sc.emptyRDD(), card_schema)
empty_10m_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_10m)
empty_1h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_1h)
empty_12h_agg_df = sqlContext.createDataFrame(sc.emptyRDD(), schema_12h)
Establish a connection with your Hopsworks feature store.
import hsfs
connection = hsfs.connection()
# get a reference to the feature store, you can access also shared feature stores by providing the feature store name
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
Create feature group metadata objects and save empty spark dataframes to materialise them in hopsworks feature store.
Now We will create each feature group and enable online feature store. Since we are plannig to use these feature groups durring online model serving primary key(s) are required to retrieve feature vector from online feature store.
card_transactions = fs.create_feature_group("card_transactions",
version = 1,
online_enabled=False,
statistics_config=False,
primary_key=["tid"])
card_transactions.save(empty_card_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0b0022c0d0>
card_transactions_10m_agg = fs.create_feature_group("card_transactions_10m_agg",
version = 1,
online_enabled=True,
statistics_config=False,
primary_key=["cc_num"])
card_transactions_10m_agg.save(empty_10m_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0b00251dd0>
card_transactions_1h_agg = fs.create_feature_group("card_transactions_1h_agg",
version = 1,
online_enabled=True,
statistics_config=False,
primary_key=["cc_num"])
card_transactions_1h_agg.save(empty_1h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0affdc8e50>
card_transactions_12h_agg = fs.create_feature_group("card_transactions_12h_agg",
version = 1,
online_enabled=True,
statistics_config=False,
primary_key=["cc_num"])
card_transactions_12h_agg.save(empty_12h_agg_df)
<hsfs.feature_group.FeatureGroup object at 0x7f0affdc2f10>