Feature Ingestion from Redshift with Spark

Redshift Integration

This notebooks guides through the ingestion of Redshift data in the Hopsworks feature store. To follow this notebook users should have an existing Redshift cluster, if not, they can follow the AWS documentation.

The data for this tutorial is available in CSV format [here]() Users should create the following table in Redshift

CREATE TABLE telco(
    customer_id varchar(200),
    gender varchar(200),
    senior_citizen integer,
    partner varchar(200),
    dependents varchar(200),
    tenure integer,
    phone_service varchar(200),
    multiple_lines varchar(200),
    internet_service varchar(200),
    online_security varchar(200),
    online_backup varchar(200),
    device_protection varchar(200),
    tech_support varchar(200),
    streaming_tv varchar(200),
    streaming_movies varchar(200),
    contract varchar(200),
    paperless_billing varchar(200),
    payment_method varchar(200),
    monthly_charges double precision,
    total_charges varchar(200),
    churn varchar(200)
)

and populate the table using the copy command:

COPY telco
FROM 's3://bucket/telco_customer_churn.csv'
IAM_ROLE 'arn:aws:iam::xxxxxxxxx:role/role_name'
FORMAT as CSV
FILLRECORD

Once the data has been imported into Redshift, we can start ingesting it into the Hopsworks Feature Store.

Storage Connector

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector.The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.

Redshift Connector UI

  • Cluster identifier: The name of the cluster

  • Database driver: You can use the default JDBC Redshift Driver com.amazon.redshift.jdbc42.Driver (More on this later)

  • Database endpoint: The endpoint for the database. Should be in the format of [UUID].eu-west-1.redshift.amazonaws.com

  • Database name: The name of the database to query

  • Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project. The second option is to configure an IAM role. With IAM roles, Jobs or notebooks launched on Hopsworks do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role. For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role.

With regards to the database driver, the library to interact with Redshift is not included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from here. You then upload the driver files to the “Resources” dataset in your project. Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

The library can be downloaded here: https://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html#download-jdbc-driver

import com.logicalclocks.hsfs._
import scala.collection.JavaConversions._
import collection.JavaConverters._

import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.types._

val connection = HopsworksConnection.builder().build();
val fs = connection.getFeatureStore();
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
36application_1608750159023_0006sparkidleLinkLink
SparkSession available as 'spark'.
import com.logicalclocks.hsfs._
import scala.collection.JavaConversions._
import collection.JavaConverters._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
connection: com.logicalclocks.hsfs.HopsworksConnection = com.logicalclocks.hsfs.HopsworksConnection@5b16ea87
fs: com.logicalclocks.hsfs.FeatureStore = FeatureStore{id=67, name='demo_fs_meb10000_featurestore', projectId=119, featureGroupApi=com.logicalclocks.hsfs.metadata.FeatureGroupApi@3b06db70}

External (On-Demand) Feature Group

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source.

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

// Retrieve the storage connector defined before
val redshiftConn = fs.getStorageConnector("telco_redshift_cluster")
redshiftConn: com.logicalclocks.hsfs.StorageConnector = StorageConnector(id=1025, name=telco_redshift_cluster, accessKey=null, secretKey=null, serverEncryptionAlgorithm=null, serverEncryptionKey=null, bucket=null, clusterIdentifier=telco-redshift-cluster, databaseDriver=com.amazon.redshift.jdbc42.Driver, databaseEndpoint=cxwh6weoo4ae.eu-west-1.redshift.amazonaws.com, databaseName=dev, databasePort=5439, tableName=null, databaseUserName=awsuser, autoCreate=null, databaseGroup=null, expiration=null, databasePassword=Fabio123, sessionToken=null, connectionString=null, arguments=, storageConnectorType=REDSHIFT)
val telcoOnDmd = (fs.createOnDemandFeatureGroup()
                    .name("telco_redshift_scala")
                    .version(2)
                    .query("select * from telco")
                    .description("On-demand feature group for telecom customer data")
                    .storageConnector(redshiftConn)
                    .statisticsEnabled(true)
                    .build())
telcoOnDmd: com.logicalclocks.hsfs.OnDemandFeatureGroup = com.logicalclocks.hsfs.OnDemandFeatureGroup@4d876d28
telcoOnDmd.save()

Engineer features and save to the Feature Store

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

telcoOnDmd.select(Seq("customer_id", "internet_service", "phone_service", "total_charges", "churn")).show(5)
+-----------+----------------+-------------+-------------+-----+
|customer_id|internet_service|phone_service|total_charges|churn|
+-----------+----------------+-------------+-------------+-----+
| 7590-VHVEG|             DSL|           No|        29.85|   No|
| 5575-GNVDE|             DSL|          Yes|       1889.5|   No|
| 3668-QPYBK|             DSL|          Yes|       108.15|  Yes|
| 7795-CFOCW|             DSL|           No|      1840.75|   No|
| 9237-HQITU|     Fiber optic|          Yes|       151.65|  Yes|
+-----------+----------------+-------------+-------------+-----+
only showing top 5 rows

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. This helper guide explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

var sparkDf = telcoOnDmd.read()
sparkDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_id: string, gender: string ... 19 more fields]
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.{Pipeline, PipelineModel}
val categoricalColumns = Seq("gender", "senior_citizen","partner","dependents","phone_service","multiple_lines",
                      "internet_service", "online_security", "online_backup", "device_protection", "tech_support",
                      "streaming_tv", "streaming_movies", "contract", "paperless_billing", "payment_method", "churn")

sparkDf = sparkDf.withColumn("total_charges", $"total_charges".cast(DoubleType)).na.fill(0)

var stages = List[StringIndexer]() // stages in our Pipeline
var outputCols = List(("customer_id", "customer_id"))

for (categoricalCol <- categoricalColumns) {
    // Category Indexing with StringIndexer
    val outputCol = categoricalCol + "_index"
    val stringIndexer = new StringIndexer()
    stringIndexer.setInputCol(categoricalCol)
    stringIndexer.setOutputCol(outputCol)
    
    stages = stringIndexer :: stages
    outputCols = (categoricalCol, outputCol) :: outputCols
}

val pipeline = new Pipeline().setStages(stages.toArray)
val dataset = pipeline.fit(sparkDf).transform(sparkDf)
val telcoFgDf = dataset.selectExpr(outputCols.map(oc => oc._1  + " as " + oc._2):_*)
categoricalColumns: Seq[String] = List(gender, senior_citizen, partner, dependents, phone_service, multiple_lines, internet_service, online_security, online_backup, device_protection, tech_support, streaming_tv, streaming_movies, contract, paperless_billing, payment_method, churn)
sparkDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_id: string, gender: string ... 19 more fields]
stages: List[org.apache.spark.ml.feature.StringIndexer] = List()
outputCols: List[(String, String)] = List((customer_id,customer_id))
pipeline: org.apache.spark.ml.Pipeline = pipeline_3f1d5b001461
dataset: org.apache.spark.sql.DataFrame = [customer_id: string, gender: string ... 36 more fields]
telcoFgDf: org.apache.spark.sql.DataFrame = [churn_index: string, payment_method_index: string ... 16 more fields]
telcoFgDf.show(5)
+-----------+--------------------+-----------------------+--------------+----------------------+------------------+------------------+-----------------------+-------------------+---------------------+----------------------+--------------------+-------------------+----------------+-------------+--------------------+------------+-----------+
|churn_index|payment_method_index|paperless_billing_index|contract_index|streaming_movies_index|streaming_tv_index|tech_support_index|device_protection_index|online_backup_index|online_security_index|internet_service_index|multiple_lines_index|phone_service_index|dependents_index|partner_index|senior_citizen_index|gender_index|customer_id|
+-----------+--------------------+-----------------------+--------------+----------------------+------------------+------------------+-----------------------+-------------------+---------------------+----------------------+--------------------+-------------------+----------------+-------------+--------------------+------------+-----------+
|         No|    Electronic check|                    Yes|Month-to-month|                    No|                No|                No|                     No|                Yes|                   No|                   DSL|    No phone service|                 No|              No|          Yes|                   0|      Female| 7590-VHVEG|
|         No|        Mailed check|                     No|      One year|                    No|                No|                No|                    Yes|                 No|                  Yes|                   DSL|                  No|                Yes|              No|           No|                   0|        Male| 5575-GNVDE|
|        Yes|        Mailed check|                    Yes|Month-to-month|                    No|                No|                No|                     No|                Yes|                  Yes|                   DSL|                  No|                Yes|              No|           No|                   0|        Male| 3668-QPYBK|
|         No|Bank transfer (au...|                     No|      One year|                    No|                No|               Yes|                    Yes|                 No|                  Yes|                   DSL|    No phone service|                 No|              No|           No|                   0|        Male| 7795-CFOCW|
|        Yes|    Electronic check|                    Yes|Month-to-month|                    No|                No|                No|                     No|                 No|                   No|           Fiber optic|                  No|                Yes|              No|           No|                   0|      Female| 9237-HQITU|
+-----------+--------------------+-----------------------+--------------+----------------------+------------------+------------------+-----------------------+-------------------+---------------------+----------------------+--------------------+-------------------+----------------+-------------+--------------------+------------+-----------+
only showing top 5 rows

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (online_enabled=True), thus enabling low latency access to the features using the online feature store API.

val telcoFg = (fs.createFeatureGroup()
                 .name("telco_customer_features")
                 .version(2)
                 .description("Telecom customer features")
                 .onlineEnabled(true)
                 .timeTravelFormat(TimeTravelFormat.HUDI)
                 .primaryKeys(Seq("customer_id"))
                 .statisticsEnabled(true)
                 .build())
telcoFg: com.logicalclocks.hsfs.FeatureGroup = com.logicalclocks.hsfs.FeatureGroup@6927f5b8
telcoFg.save(telcoFgDf)