Feature Ingestion from S3

Ingest Telecom Churn data from a S3 bucket to the Feature Store

First, download this sample data from here - and upload it into a S3 bucket.

You first need an IAM Role

You will need an IAM role to be able to read data from a S3 bucket. In Hopsworks, there are two ways of assuming an IAM role for the notebooks/jobs that you run in Hopsworks: 1. you can assign an Instance Profile to the Hopsworks cluster when you create it and all users share its IAM Role, and 2. you can assign multiple IAM Roles to a Hopsworks Cluster, and then decide which Projects and its users can assume which IAM Role.

Cluster-wide IAM Role

On hopsworks.ai, when you are configuring your Hopsworks cluster, you can select an Instance profile for Hopsworks - see the screenshot below.. All jobs run on Hopsworks can use the IAM Role for this Instance profile (the Instance profile is an IAM Role for this instance). That is, all Hopsworks users share the Instance Profile role and the resource access policies attached to that role.

Cluster-wide IAM Profile

Federated IAM Roles (Role Chaining)

You can restrict a IAM Roles to be only usable within a specified project. Within the specified project, you can furuther retrict which role a user must have to be able to use the IAM Role - e.g., only Data Owners in the project called Noc-list can use this assume IAM role. See details on how to setup multiple IAM Roles (Role Chaining) in our documentation.

Create S3 Storage Connector to your Bucket

You should also create a S3 storage connector pointing to the bucket where you uploaded the data. You can follow the Storage Connectors documentation to see how you can create the storage connector from the feature store UI. If you have assigned a cluster-wide IAM Role, you will not need to specify the IAM role to be used. If you are using Federated IAM Roles, and you have permissions to assume one of the IAM Roles in the current project, then you select the IAM Role to use to connect to the S3 bucket from the drop-down list (“No IAM role defined”), as shown in the screenshot below. It is also possible to create a S3 Storage Connector using an Access Key and Secret Key, although IAM Roles are the preferred authentication method.

S3 connector

import hsfs
connection = hsfs.connection()
fs = connection.get_feature_store()

To instruct Spark to read from S3 we build the path to the file in the bucket.
Note the file system - s3a://
Note, PySpark reads all columns as a string (StringType) by default - inferSchema=True tries to infer the column types.

sc = fs.get_storage_connector("telco_delta")

df = spark.read.csv("s3a://" + sc.bucket + "/telco-delta", header=True, inferSchema=True) 

telco_fg = fs.create_feature_group(name="telco_fg",
                                   description="On-demand FG with telecom data",
                                   statistics_config={"enabled": True, "histograms": True, "correlations": True})
computing descriptive statistics for : sacramento_houses_raw, version: 1
computing feature correlation for: sacramento_houses_raw, version: 1
computing feature histograms for: sacramento_houses_raw, version: 1
computing cluster analysis for: sacramento_houses_raw, version: 1
Registering feature metadata...
Registering feature metadata... [COMPLETE]
Writing feature data to offline feature group (Hive)...
Running sql: use demo_featurestore_admin000_featurestore against offline feature store
Writing feature data to offline feature group (Hive)... [COMPLETE]
Feature group created successfully
Feature group imported successfully

In the feature store UI you should now be able to see that the feature group has been created, browse its schema and statistics. You can now use it to build training datasets.

# You could also try and add a schema to the dataframe.
# schema = StructType() \
#       .add("customer_id",StringType(),True) \
#       .add("gender",StringType(),True) \  # BooleanType
#       .add("senior_citizen",StringType(),True) \
#       .add("partner",StringType(),True) \
#       .add("dependents",StringType(),True) \
#       .add("tenure",IntegerType(),True) \
#       .add("phone_service",StringType(),True) \
#       .add("multiple_lines",StringType(),True) \
#       .add("internet_service",StringType(),True) \
#       .add("online_security",StringType(),True) \
#       .add("online_backup",StringType(),True) \
#       .add("device_protection",StringType(),True) \
#       .add("tech_support",StringType(),True) \
#       .add("streaming_tv",StringType(),True) \
#       .add("streaming_movies",StringType(),True) \
#       .add("contract",StringType(),True) \
#       .add("paperless_billing",StringType(),True) \
#       .add("payment_method",DoubleType(),True) \
#       .add("monthly_charges",StringType(),True) \
#       .add("total_charges",DoubleType(),True) \
#       .add("churn",DoubleType(),True) 
# Add: spark.read.csv(...., schema=schema)