Databricks Azure Feature Store Quickstart

Databricks Azure Feature Store Quick Start

This notebook gives you a quick overview of how you can intergrate the Feature Store on Hopsworks with Databricks and Azure ADL. We’ll go over four steps:

  • Generate some sample data and store it on ADL
  • Do some feature engineering with Databricks and the data from ADL
  • Save the engineered features to the Feature Store
  • Select a group of the features from the Feature Store and create a training dataset

This requries configuring the Databricks cluster to be able to interact with Hopsworks Feature Store, see Databricks Quick Start.

Imports

We’ll use numpy and pandas for data generation, pyspark for feature engineering, tensorflow and keras for model training, and the hsfs library to interact with the Hopsworks Feature Store.

import hsfs
import random
import numpy as np
import pandas as pd

from pyspark.sql import SQLContext
from pyspark.sql import Row
sqlContext = SQLContext(sc)

/databricks/python/lib/python3.7/site-packages/botocore/vendored/requests/packages/urllib3/_collections.py:1: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working from collections import Mapping, MutableMapping

Connecting to the Feature Store

# Connect to the feature store, see https://docs.hopsworks.ai/latest/generated/api/connection_api/#connection_1 for more information
# The API key can be also saved as secret in Databricks and retrieved using the dbutils
connection = hsfs.connection(
  host="10.0.0.4",
  project="dataai",
  port="443",
  api_key_value="IbCoKz4aRChIeWpj.qNJjGItvVfUBeTxCCk6osXyswK9oDmcNVw1X9xrK8khVUcLqgtF8xM3v5H8dMWvP",
  hostname_verification=False
)

fs = connection.get_feature_store()

Connected. Call .close() to terminate connection gracefully.

Configure Databricks to write to ADL gen2

Follow the steps in the Databricks Documentation

Generate Sample Data

Lets generate two sample datasets and store them on S3:

  1. houses_for_sale_data:
+-------+--------+------------------+------------------+------------------+
|area_id|house_id|       house_worth|         house_age|        house_size|
+-------+--------+------------------+------------------+------------------+
|      1|       0| 11678.15482418699|133.88670106643886|366.80067322738535|
|      1|       1| 2290.436167500643|15994.969706808222|195.84014889823976|
|      1|       2| 8380.774578431328|1994.8576926471007|1544.5164614303735|
|      1|       3|11641.224696102923|23104.501275562343|1673.7222604337876|
|      1|       4| 5382.089422436954| 13903.43637058141| 274.2912104765028|
+-------+--------+------------------+------------------+------------------+

 |-- area_id: long (nullable = true)
 |-- house_id: long (nullable = true)
 |-- house_worth: double (nullable = true)
 |-- house_age: double (nullable = true)
 |-- house_size: double (nullable = true)
  1. houses_sold_data ``bash +——-+—————–+—————–+——————+ |area_id|house_purchase_id|number_of_bidders| sold_for_amount| +——-+—————–+—————–+——————+ | 1| 0| 0| 70073.06059070028| | 1| 1| 15| 146.9198329740602| | 1| 2| 6| 594.802165433149| | 1| 3| 10| 77187.84123130841| | 1| 4| 1|110627.48922722359| +——-+—————–+—————–+——————+

|– area_id: long (nullable = true) |– house_purchase_id: long (nullable = true) |– number_of_bidders: long (nullable = true) |– sold_for_amount: double (nullable = true)


We'll use this data for predicting what a house is sold for based on features about the **area** where the house is.

##### Generation of `houses_for_sale_data`


```pyspark
area_ids = list(range(1,51))
house_sizes = []
house_worths = []
house_ages = []
house_area_ids = []
for i in area_ids:
    for j in list(range(1,100)):
        house_sizes.append(abs(np.random.normal()*1000)/i)
        house_worths.append(abs(np.random.normal()*10000)/i)
        house_ages.append(abs(np.random.normal()*10000)/i)
        house_area_ids.append(i)
house_ids = list(range(len(house_area_ids)))
houses_for_sale_data  = pd.DataFrame({
        'area_id':house_area_ids,
        'house_id':house_ids,
        'house_worth': house_worths,
        'house_age': house_ages,
        'house_size': house_sizes
    })
houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)

houses_for_sale_data_spark_df.show(5)
houses_for_sale_data_spark_df.printSchema()

+——-+——–+——————+——————+——————+ area_id|house_id| house_worth| house_age| house_size| +——-+——–+——————+——————+——————+ 1| 0|1991.9888943412495| 7167.871511762735|2403.4083622753215| 1| 1|14264.364158433278| 2050.858854537419|12.544630598354674| 1| 2|17842.405873372376|427.54596016089846| 346.6902449005049| 1| 3| 9505.131108244657|1881.7501969939058| 273.8686208277227| 1| 4|1252.5398136957444|2242.2149219552875| 367.5512280204664| +——-+——–+——————+——————+——————+ only showing top 5 rows

root – area_id: long (nullable = true) – house_id: long (nullable = true) – house_worth: double (nullable = true) – house_age: double (nullable = true) – house_size: double (nullable = true)

houses_for_sale_data_spark_df.write.parquet("abfss://dbexample@hopsworksdbexample.dfs.core.windows.net/house_sales_data")

Generation of houses_sold_data
house_purchased_amounts = []
house_purchases_bidders = []
house_purchases_area_ids = []
for i in area_ids:
    for j in list(range(1,1000)):
        house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)
        house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))
        house_purchases_area_ids.append(i)
house_purchase_ids = list(range(len(house_purchases_bidders)))
houses_sold_data  = pd.DataFrame({
        'area_id':house_purchases_area_ids,
        'house_purchase_id':house_purchase_ids,
        'number_of_bidders': house_purchases_bidders,
        'sold_for_amount': house_purchased_amounts
    })
houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)

houses_sold_data_spark_df.show(5)
houses_sold_data_spark_df.printSchema()

+——-+—————–+—————–+——————+ area_id|house_purchase_id|number_of_bidders| sold_for_amount| +——-+—————–+—————–+——————+ 1| 0| 5| 71267.36761467403| 1| 1| 3| 39689.03803464887| 1| 2| 0|33332.809984440915| 1| 3| 17|37183.624558190655| 1| 4| 1| 99465.23505460238| +——-+—————–+—————–+——————+ only showing top 5 rows

root – area_id: long (nullable = true) – house_purchase_id: long (nullable = true) – number_of_bidders: long (nullable = true) – sold_for_amount: double (nullable = true)

houses_sold_data_spark_df.write.parquet("abfss://dbexample@hopsworksdbexample.dfs.core.windows.net/house_sold_data")

Generate Features From houses_for_sale_data

houses_for_sale_data_spark_df = spark.read.parquet("abfss://dbexample@hopsworksdbexample.dfs.core.windows.net/house_sales_data")
sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").sum()
count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").count()
sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, "area_id")
sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \
    .withColumnRenamed("sum(house_age)", "sum_house_age") \
    .withColumnRenamed("sum(house_worth)", "sum_house_worth") \
    .withColumnRenamed("sum(house_size)", "sum_house_size") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_house_for_sale(row):
    avg_house_worth = row.sum_house_worth/float(row.num_rows)
    avg_house_size = row.sum_house_size/float(row.num_rows)
    avg_house_age = row.sum_house_age/float(row.num_rows)
    return Row(
        sum_house_worth=row.sum_house_worth, 
        sum_house_age=row.sum_house_age,
        sum_house_size=row.sum_house_size,
        area_id = row.area_id,
        avg_house_worth = avg_house_worth,
        avg_house_size = avg_house_size,
        avg_house_age = avg_house_age
       )
houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(
    lambda row: compute_average_features_house_for_sale(row)
).toDF()

houses_for_sale_features_df.printSchema()

root – area_id: long (nullable = true) – avg_house_age: double (nullable = true) – avg_house_size: double (nullable = true) – avg_house_worth: double (nullable = true) – sum_house_age: double (nullable = true) – sum_house_size: double (nullable = true) – sum_house_worth: double (nullable = true)

Generate Features from houses_sold_data

houses_sold_data_spark_df = spark.read.parquet("abfss://dbexample@hopsworksdbexample.dfs.core.windows.net/house_sold_data")
sum_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").sum()
count_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").count()
sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, "area_id")
sum_count_houses_sold_df = sum_count_houses_sold_df \
    .withColumnRenamed("sum(number_of_bidders)", "sum_number_of_bidders") \
    .withColumnRenamed("sum(sold_for_amount)", "sum_sold_for_amount") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_houses_sold(row):
    avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)
    avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)
    return Row(
        sum_number_of_bidders=row.sum_number_of_bidders, 
        sum_sold_for_amount=row.sum_sold_for_amount,
        area_id = row.area_id,
        avg_num_bidders = avg_num_bidders,
        avg_sold_for = avg_sold_for
       )
houses_sold_features_df = sum_count_houses_sold_df.rdd.map(
    lambda row: compute_average_features_houses_sold(row)
).toDF()

houses_sold_features_df.printSchema()

root – area_id: long (nullable = true) – avg_num_bidders: double (nullable = true) – avg_sold_for: double (nullable = true) – sum_number_of_bidders: long (nullable = true) – sum_sold_for_amount: double (nullable = true)

Save Features to the Feature Store

The Featue store has an abstraction of a feature group which is a set of features that naturally belong together that are computed using the same feature engineering job.

Lets create two feature groups:

  1. houses_for_sale_featuregroup

  2. houses_sold_featuregroup

# Refer to https://docs.hopsworks.ai/latest/generated/api/feature_store_api/#create_feature_group for the different parameters for creating a feature group
house_sale_fg = fs.create_feature_group("houses_for_sale_featuregroup",
                       version=1,
                       description="aggregate features of houses for sale per area",
                       primary_key=['area_id'],
                       online_enabled=False,
                       time_travel_format=None,
                       statistics_config={"histograms": True, "correlations": True})

house_sale_fg.save(houses_for_sale_features_df)

Out[20]: <hsfs.feature_group.FeatureGroup at 0x7f0e346870f0>

house_sold_fg = fs.create_feature_group("houses_sold_featuregroup",
                       version=1,
                       description="aggregate features of sold houses per area",
                       primary_key=['area_id'],
                       online_enabled=False,
                       time_travel_format=None,
                       statistics_config={"histograms": True, "correlations": True})

house_sold_fg.save(houses_sold_features_df)

Out[21]: <hsfs.feature_group.FeatureGroup at 0x7f0e34685240>

Create a Training Dataset

The feature store has an abstraction of a training dataset, which is a dataset with a set of features (potentially from many different feature groups) and labels (in case of supervised learning) stored in a ML Framework friendly format (CSV, Tfrecords, …)

Let’s create a training dataset called predict_house_sold_for_dataset using the following features:

  • avg_house_age
  • avg_house_size
  • avg_house_worth
  • avg_num_bidders

and the target variable is:

  • avg_sold_for
# Join features and feature groups to create the training dataset
feature_query = house_sale_fg.select(["avg_house_age", "avg_house_size", "avg_house_worth"])\
                            .join(house_sold_fg.select(["avg_num_bidders", "avg_sold_for"]))
  
# Create the training dataset metadata
td = fs.create_training_dataset(name="predict_house_sold_for_dataset",
                           version=1,
                           data_format="csv",
                           label=['avg_sold_for'],
                           statistics_config={"histograms": True, "correlations": True})

# Save the training dataset
td.save(feature_query)

Out[22]: <hsfs.training_dataset.TrainingDataset at 0x7f0e37442d30>