End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification

End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification


In this notebook we will,

  1. Load the Iris Flower dataset from HopsFS
  2. Do feature engineering on the dataset
  3. Save the features to the feature store
  4. Read the feature data from the feature store
  5. Train a KNN Model using SkLearn
  6. Save the trained model to HopsFS
  7. Launch a serving instance to serve the trained model
  8. Send some prediction requests to the served model
  9. Monitor the predictions through Kafka

Model Serving on Hopsworks


The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Using the experiment module

To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.

The experiment module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.

An Experiment could be a single Python program, which we refer to as an Experiment.

Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as Parallel Experiment.

ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as Distributed Training.

Using the hdfs module

The hdfs module provides a method to get the path in HopsFS where your data is stored, namely by calling hdfs.project_path(). The path resolves to the root path for your project, which is the view that you see when you click Data Sets in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be hdfs.project_path() + 'Resources/mnist'

# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
from hops import hdfs
# Uploading the traied model to hdfs
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources/iris_flower", overwrite=True)

# Downloading the iris flower model to the current working directory
iris_flower_model_hdfs_path = hdfs.project_path() + "Resources/iris_flower/iris_knn.pkl"
local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)


See the following links to learn more about running experiments in Hopsworks

Managing experiments

Experiments service provides a unified view of all the experiments run using the experiment module.
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.


Import libraries

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, serving, tls
from confluent_kafka import Producer, Consumer, KafkaError
import random
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
SparkSession available as 'spark'.

Prepare Training Dataset

Load Iris Dataset (csv)

project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
    project_path + "TourData/iris/iris.csv")
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)

Feature Engineering

The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the variety column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.

encoder = StringIndexer(inputCol="variety", outputCol="label")
fit_model = encoder.fit(iris_df)
iris_df1 = fit_model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
only showing top 5 rows
|   variety|label|
| Virginica|  2.0|
|Versicolor|  1.0|
|    Setosa|  0.0|

Save Features to the Feature Store

We can save two feature groups (hive tables), one called iris_features that contains the iris features and the corresponding numeric label, and another feature group called iris_labels_lookup for converting the numeric iris label back to categorical.

Note: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the “Settings” tab in your project, select the feature store service and click “Save”.

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
iris_features = fs.create_feature_group(name="iris_features", version=1, time_travel_format=None)
iris_labels_lookup = fs.create_feature_group(name="iris_labels_lookup", version=1, time_travel_format=None)

Read the Iris Training Dataset from the Feature Store

iris_features = fs.get_feature_group("iris_features", 1)
df = iris_features.read().toPandas()
   sepal_length  sepal_width  petal_length  petal_width  label
0           5.1          3.5           1.4          0.2      0
1           4.9          3.0           1.4          0.2      0
2           4.7          3.2           1.3          0.2      0
3           4.6          3.1           1.5          0.2      0
4           5.0          3.6           1.4          0.2      0
5           5.4          3.9           1.7          0.4      0
6           4.6          3.4           1.4          0.3      0
7           5.0          3.4           1.5          0.2      0
8           4.4          2.9           1.4          0.2      0
9           4.9          3.1           1.5          0.1      0
       sepal_length  sepal_width  petal_length  petal_width       label
count    150.000000   150.000000    150.000000   150.000000  150.000000
mean       5.843333     3.057333      3.758000     1.199333    1.000000
std        0.828066     0.435866      1.765298     0.762238    0.819232
min        4.300000     2.000000      1.000000     0.100000    0.000000
25%        5.100000     2.800000      1.600000     0.300000    0.000000
50%        5.800000     3.000000      4.350000     1.300000    1.000000
75%        6.400000     3.300000      5.100000     1.800000    2.000000
max        7.900000     4.400000      6.900000     2.500000    2.000000
x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = df[["label"]]
X = x_df.values
y = y_df.values.ravel()

Train an Iris Flower Classifier

IRIS_FLOWER_RESOURCES_PATH = "/Resources/iris_flower"
IRIS_FLOWER_MODEL_PKL = "iris_knn.pkl"

Generate a KNN Model using the Feature Data

neighbors = random.randint(3, 30)
iris_knn = KNeighborsClassifier(n_neighbors=neighbors)
iris_knn.fit(X, y)
y_pred = iris_knn.predict(X)
acc = accuracy_score(y, y_pred)

Save the Trained Model to HopsFS

joblib.dump(iris_knn, IRIS_FLOWER_MODEL_PKL)
hdfs.copy_to_hdfs(IRIS_FLOWER_MODEL_PKL, IRIS_FLOWER_RESOURCES_PATH, overwrite=True)

Export the Trained Model to the Models Repository

It is a best-practice to put the python script that loads and run the model together with the trained model files. Below is the code for copying the script into the iris flower resources folder, which will be used when exporting the model.

IRIS_FLOWER_CLASSIFIER_SCRIPT = "iris_flower_classifier.py"

script_path = "Jupyter/end_to_end_pipeline/sklearn/" + IRIS_FLOWER_CLASSIFIER_SCRIPT
hdfs.cp(script_path, IRIS_FLOWER_RESOURCES_PATH + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT, overwrite=True)
MODEL_NAME = "irisflowerclassifier"

import hsml
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

input_schema = Schema(x_df)
output_schema = Schema(y_df)

conn = hsml.connection()
mr = conn.get_model_registry()

tf_model = mr.sklearn.create_model(MODEL_NAME, 
                                   metrics={'accuracy': acc},
                                   model_schema=ModelSchema(input_schema=input_schema, output_schema=output_schema))

Connected. Call `.close()` to terminate connection gracefully.
Exported model irisflowerclassifier with version 1
<hsml.sklearn.model.Model object at 0x7f3aa6ab5fd0>
Model export complete: 100%|##########| 6/6 [00:09<00:00,  1.60s/it]                   
for p in hdfs.ls("Models/" + MODEL_NAME, recursive=True):

Serve the Trained Model

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the Predict class and the methods predict, classify and regress, like this:

from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/irisflowerclassifier/1/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")

    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"

Then upload this python script to some folder in your project and go to the “Model Serving” service in Hopsworks:


Then click on “create serving” and configure your serving:


Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.


Query Model Repository for best IrisFlowerClassifier Model


best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")

print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using serving.create_or_update()

# Create serving
SCRIPT_PATH = best_model.version_path + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT

serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                         model_path=best_model.model_path, # set the path of the model to be deployed
                         model_server="PYTHON", # set the model server to run the model
                         predictor=SCRIPT_PATH, # set the predictor to load the model and make predictions
                         # optional arguments
                         model_version=best_model.version, # set the version of the model to be deployed
                         kfserving=False, # the model will be served either with Docker or Kubernetes depending on the Hopsworks version
                         topic_name="CREATE", # topic name or CREATE to create a new topic for inference logging, otherwise NONE
                         instances=1 # number of replicas
for s in serving.get_all():

After the serving have been created, you can find it in the Hopsworks UI by going to the “Model Serving” tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like:

  • get_servings()
  • get_serving_id(serving_name)
  • get_serving_model_path(serving_name)
  • get_serving_type(serving_name)
  • get_serving_version(serving_name)
  • get_serving_kafka_topic(serving_name)
  • get_serving_status(serving_name)
  • exist(serving_name)
print("Info: \tid: {},\n \
       model_path: {},\n \
       model_version: {},\n \
       artifact_version: {},\n \
       model_server: {},\n \
       serving_tool: {}".format(
Info:   id: 1,
        model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py,
        model_version: 1,
        artifact_version: 0,
        model_server: FLASK,
        serving_tool: DEFAULT

Or describe a serving using - describe(serving_name)


Classify Iris Flowers using the Trained Model

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

if serving.get_status(SERVING_NAME) == "Running":

Start new serving


Wait until serving is up and running

while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly

Send Prediction Requests to the Served Model using Hopsworks REST API

For making inference requests you can use the utility method serving.make_inference_request


for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(SERVING_NAME, data)
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}

Monitor Prediction Logs

Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

Setup a Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read the lookup table from the Feature Store for converting numerical labels to categorical

iris_labels_lookup = fs.get_feature_group("iris_labels_lookup", 1)
iris_labels_lookup_df = iris_labels_lookup.read().toPandas()

Read messages from the Kafka topic, parse them with the Avro schema and print the results


for i in range(0, 5):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            print("serving: {}, version: {}, timestamp: {},\n"\
                  "         http_response_code: {}, model_server: {}, serving_tool: {}".format(
            if PRINT_INSTANCES:
                print("instances: {}\n".format(event_dict["inferenceRequest"]))
            if PRINT_PREDICTIONS:
                prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
                prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 'variety'].iloc[0]
                print("predictions: {}, prediction_label:{}\n".format(prediction, prediction_label))

        except Exception as e:
            print("A message was read but there was an error parsing it")
        print("timeout.. no more messages to read from topic")
timeout.. no more messages to read from topic
serving: irisflowerclassifier, version: 1, timestamp: 1640099468723,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica

serving: irisflowerclassifier, version: 1, timestamp: 1640099468803,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica

serving: irisflowerclassifier, version: 1, timestamp: 1640099468901,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica

serving: irisflowerclassifier, version: 1, timestamp: 1640099468994,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica