Databricks Feature AWS Store Quickstart

Databricks AWS Feature Store Quick Start

This notebook gives you a quick overview of how you can intergrate the Feature Store on Hopsworks with Databricks and S3. We’ll go over four steps:

  1. Generate some sample data and store it on S3
  2. Do some feature engineering with Databricks and the data from S3
  3. Save the engineered features to the Feature Store
  4. Select a group of the features from the Feature Store and create a training dataset of tf records stored on S3

This requries configuring the Databricks cluster to be able to interact with Hopsworks Feature Store, see Databricks Quick Start.

Imports

We’ll use numpy and pandas for data generation, pyspark for feature engineering, tensorflow and keras for model training, and the hsfs library to interact with the Hopsworks Feature Store.

import hsfs
import random
import numpy as np
import pandas as pd

from pyspark.sql import SQLContext
from pyspark.sql import Row
sqlContext = SQLContext(sc)

Connecting to the Feature Store

# Connect to the feature store, see https://docs.hopsworks.ai/latest/generated/api/connection_api/#connection_1 for more information
# On AWS secrets can be stored also on the Secrets Manager or Parameter Store.
connection = hsfs.connection(
  host="10.0.0.247",
  project="dataai",
  port="443",
  api_key_value="pduNksE2VMuSYdCY.K4AsDoxMBX5luisgb3pB7FimpEO7iyOuYvLPqWQcnXUs51RlBrLyAYXpKDoWH9Cm",
  hostname_verification=False
)

fs = connection.get_feature_store()

Mounting an S3 bucket to Databricks

# Mount a bucket so that we can simulate a Datalake based on S3
# This requires IAM roles to be set up for Databricks, see https://docs.databricks.com/data/data-sources/aws/amazon-s3.html#access-s3-buckets-directly
AWS_BUCKET_NAME = "steffendatabricks" # Ensure to replace with your bucket
MOUNT_NAME = "/mnt/demo_db_hsfs"
dbutils.fs.mount("s3a://%s" % AWS_BUCKET_NAME, MOUNT_NAME)

Generate Sample Data

Lets generate two sample datasets and store them on S3:

  1. houses_for_sale_data:
+-------+--------+------------------+------------------+------------------+
|area_id|house_id|       house_worth|         house_age|        house_size|
+-------+--------+------------------+------------------+------------------+
|      1|       0| 11678.15482418699|133.88670106643886|366.80067322738535|
|      1|       1| 2290.436167500643|15994.969706808222|195.84014889823976|
|      1|       2| 8380.774578431328|1994.8576926471007|1544.5164614303735|
|      1|       3|11641.224696102923|23104.501275562343|1673.7222604337876|
|      1|       4| 5382.089422436954| 13903.43637058141| 274.2912104765028|
+-------+--------+------------------+------------------+------------------+

 |-- area_id: long (nullable = true)
 |-- house_id: long (nullable = true)
 |-- house_worth: double (nullable = true)
 |-- house_age: double (nullable = true)
 |-- house_size: double (nullable = true)
  1. houses_sold_data ``bash +——-+—————–+—————–+——————+ |area_id|house_purchase_id|number_of_bidders| sold_for_amount| +——-+—————–+—————–+——————+ | 1| 0| 0| 70073.06059070028| | 1| 1| 15| 146.9198329740602| | 1| 2| 6| 594.802165433149| | 1| 3| 10| 77187.84123130841| | 1| 4| 1|110627.48922722359| +——-+—————–+—————–+——————+

|– area_id: long (nullable = true) |– house_purchase_id: long (nullable = true) |– number_of_bidders: long (nullable = true) |– sold_for_amount: double (nullable = true)


We'll use this data for predicting what a house is sold for based on features about the **area** where the house is.

### Generation of `houses_for_sale_data`


```pyspark
area_ids = list(range(1,51))
house_sizes = []
house_worths = []
house_ages = []
house_area_ids = []
for i in area_ids:
    for j in list(range(1,100)):
        house_sizes.append(abs(np.random.normal()*1000)/i)
        house_worths.append(abs(np.random.normal()*10000)/i)
        house_ages.append(abs(np.random.normal()*10000)/i)
        house_area_ids.append(i)
house_ids = list(range(len(house_area_ids)))
houses_for_sale_data  = pd.DataFrame({
        'area_id':house_area_ids,
        'house_id':house_ids,
        'house_worth': house_worths,
        'house_age': house_ages,
        'house_size': house_sizes
    })
houses_for_sale_data_spark_df = sqlContext.createDataFrame(houses_for_sale_data)
houses_for_sale_data_spark_df.show(5)
houses_for_sale_data_spark_df.printSchema()
houses_for_sale_data_spark_df.write.format("parquet").save("%s/houses_for_sale.parquet" % MOUNT_NAME)

Generation of houses_sold_data

house_purchased_amounts = []
house_purchases_bidders = []
house_purchases_area_ids = []
for i in area_ids:
    for j in list(range(1,1000)):
        house_purchased_amounts.append(abs(np.random.exponential()*100000)/i)
        house_purchases_bidders.append(int(abs(np.random.exponential()*10)/i))
        house_purchases_area_ids.append(i)
house_purchase_ids = list(range(len(house_purchases_bidders)))
houses_sold_data  = pd.DataFrame({
        'area_id':house_purchases_area_ids,
        'house_purchase_id':house_purchase_ids,
        'number_of_bidders': house_purchases_bidders,
        'sold_for_amount': house_purchased_amounts
    })
houses_sold_data_spark_df = sqlContext.createDataFrame(houses_sold_data)
houses_sold_data_spark_df.show(5)
houses_sold_data_spark_df.printSchema()
houses_sold_data_spark_df.write.format("parquet").save("%s/houses_sold.parquet" % MOUNT_NAME)

Feature Engineering

Lets generate some aggregate features such as sum and averages from our datasets on S3.

  1. houses_for_sale_features:
 |-- area_id: long (nullable = true)
 |-- avg_house_age: double (nullable = true)
 |-- avg_house_size: double (nullable = true)
 |-- avg_house_worth: double (nullable = true)
 |-- sum_house_age: double (nullable = true)
 |-- sum_house_size: double (nullable = true)
 |-- sum_house_worth: double (nullable = true)
  1. houses_sold_features
 |-- area_id: long (nullable = true)
 |-- avg_num_bidders: double (nullable = true)
 |-- avg_sold_for: double (nullable = true)
 |-- sum_number_of_bidders: long (nullable = true)
 |-- sum_sold_for_amount: double (nullable = true)
display(dbutils.fs.ls(MOUNT_NAME))

Generate Features From houses_for_sale_data

houses_for_sale_data_spark_df = spark.read.parquet("%s/houses_for_sale.parquet" % MOUNT_NAME)
sum_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").sum()
count_houses_for_sale_df = houses_for_sale_data_spark_df.groupBy("area_id").count()
sum_count_houses_for_sale_df = sum_houses_for_sale_df.join(count_houses_for_sale_df, "area_id")
sum_count_houses_for_sale_df = sum_count_houses_for_sale_df \
    .withColumnRenamed("sum(house_age)", "sum_house_age") \
    .withColumnRenamed("sum(house_worth)", "sum_house_worth") \
    .withColumnRenamed("sum(house_size)", "sum_house_size") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_house_for_sale(row):
    avg_house_worth = row.sum_house_worth/float(row.num_rows)
    avg_house_size = row.sum_house_size/float(row.num_rows)
    avg_house_age = row.sum_house_age/float(row.num_rows)
    return Row(
        sum_house_worth=row.sum_house_worth, 
        sum_house_age=row.sum_house_age,
        sum_house_size=row.sum_house_size,
        area_id = row.area_id,
        avg_house_worth = avg_house_worth,
        avg_house_size = avg_house_size,
        avg_house_age = avg_house_age
       )
houses_for_sale_features_df = sum_count_houses_for_sale_df.rdd.map(
    lambda row: compute_average_features_house_for_sale(row)
).toDF()
houses_for_sale_features_df.printSchema()

Generate Features from houses_sold_data

houses_sold_data_spark_df = spark.read.parquet("%s/houses_sold.parquet" % MOUNT_NAME)
sum_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").sum()
count_houses_sold_df = houses_sold_data_spark_df.groupBy("area_id").count()
sum_count_houses_sold_df = sum_houses_sold_df.join(count_houses_sold_df, "area_id")
sum_count_houses_sold_df = sum_count_houses_sold_df \
    .withColumnRenamed("sum(number_of_bidders)", "sum_number_of_bidders") \
    .withColumnRenamed("sum(sold_for_amount)", "sum_sold_for_amount") \
    .withColumnRenamed("count", "num_rows")
def compute_average_features_houses_sold(row):
    avg_num_bidders = row.sum_number_of_bidders/float(row.num_rows)
    avg_sold_for = row.sum_sold_for_amount/float(row.num_rows)
    return Row(
        sum_number_of_bidders=row.sum_number_of_bidders, 
        sum_sold_for_amount=row.sum_sold_for_amount,
        area_id = row.area_id,
        avg_num_bidders = avg_num_bidders,
        avg_sold_for = avg_sold_for
       )
houses_sold_features_df = sum_count_houses_sold_df.rdd.map(
    lambda row: compute_average_features_houses_sold(row)
).toDF()
houses_sold_features_df.printSchema()

Save Features to the Feature Store

The Featue store has an abstraction of a feature group which is a set of features that naturally belong together that are computed using the same feature engineering job.

Lets create two feature groups:

  1. houses_for_sale_featuregroup

  2. houses_sold_featuregroup

# Refer to https://docs.hopsworks.ai/latest/generated/api/feature_store_api/#create_feature_group for the different parameters for creating a feature group
house_sale_fg = fs.create_feature_group("houses_for_sale_featuregroup",
                       version=1,
                       description="aggregate features of houses for sale per area",
                       primary_key=['area_id'],
                       online_enabled=False,
                       time_travel_format=None,
                       statistics_config={"histograms": True, "correlations": True})

house_sale_fg.save(houses_for_sale_features_df)
house_sold_fg = fs.create_feature_group("houses_sold_featuregroup",
                       version=1,
                       description="aggregate features of sold houses per area",
                       primary_key=['area_id'],
                       online_enabled=False,
                       time_travel_format=None,
                       statistics_config={"histograms": True, "correlations": True})

house_sold_fg.save(houses_sold_features_df)

Create a Training Dataset

The feature store has an abstraction of a training dataset, which is a dataset with a set of features (potentially from many different feature groups) and labels (in case of supervised learning) stored in a ML Framework friendly format (CSV, Tfrecords, …)

Let’s create a training dataset called predict_house_sold_for_dataset using the following features:

  • avg_house_age
  • avg_house_size
  • avg_house_worth
  • avg_num_bidders

and the target variable is:

  • avg_sold_for
# Get the metadata for the storage connector from the Feature store.
s3_bucket = fs.get_storage_connector("house_model_data", "S3")

# Join features and feature groups to create the training dataset
feature_query = house_sale_fg.select(["avg_house_age", "avg_house_size", "avg_house_worth"])\
                            .join(house_sold_fg.select(["avg_num_bidders", "avg_sold_for"]))
  
# Create the training dataset metadata
td = fs.create_training_dataset(name="predict_house_sold_for_dataset_two",
                           version=2,
                           data_format="csv",
                           label=['avg_sold_for'],
                           storage_connector=s3_bucket,
                           statistics_config={"histograms": True, "correlations": True})

# Save the training dataset
td.save(feature_query)