End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification

End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification


FEATURE ENGINEERING –> MODEL TRAINING –> MODEL SERVING –> MODEL MONITORING

In this notebook we will,

  1. Load the Iris Flower dataset from HopsFS
  2. Do feature engineering on the dataset
  3. Save the features to the feature store
  4. Read the feature data from the feature store
  5. Train a KNN Model using SkLearn
  6. Save the trained model to HopsFS
  7. Launch a serving instance to serve the trained model
  8. Send some prediction requests to the served model
  9. Monitor the predictions through Kafka

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Using the experiment module

To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.

The experiment module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.

An Experiment could be a single Python program, which we refer to as an Experiment.

Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as Parallel Experiment.

ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as Distributed Training.

Using the hdfs module

The hdfs module provides a method to get the path in HopsFS where your data is stored, namely by calling hdfs.project_path(). The path resolves to the root path for your project, which is the view that you see when you click Data Sets in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be hdfs.project_path() + 'Resources/mnist'

# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
from hops import hdfs
# Uploading the traied model to hdfs
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources/iris_flower", overwrite=True)

# Downloading the iris flower model to the current working directory
iris_flower_model_hdfs_path = hdfs.project_path() + "Resources/iris_flower/iris_knn.pkl"
local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)

Documentation

See the following links to learn more about running experiments in Hopsworks

Managing experiments

Experiments service provides a unified view of all the experiments run using the experiment module.
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.

Image7-Monitor.png

Import libraries

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, serving, model, tls
from confluent_kafka import Producer, Consumer, KafkaError
import random
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
2application_1623896729676_0003pysparkidleLinkLink
SparkSession available as 'spark'.

Prepare Training Dataset

Load Iris Dataset (csv)

project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
    project_path + "TourData/iris/iris.csv")
iris_df.printSchema()
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)

Feature Engineering

The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the variety column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.

encoder = StringIndexer(inputCol="variety", outputCol="label")
fit_model = encoder.fit(iris_df)
iris_df1 = fit_model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
iris_df3.printSchema()
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)
iris_df3.show(5)
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows
lookup_df.show(3)
+----------+-----+
|   variety|label|
+----------+-----+
| Virginica|  2.0|
|Versicolor|  1.0|
|    Setosa|  0.0|
+----------+-----+

Save Features to the Feature Store

We can save two feature groups (hive tables), one called iris_features that contains the iris features and the corresponding numeric label, and another feature group called iris_labels_lookup for converting the numeric iris label back to categorical.

Note: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the “Settings” tab in your project, select the feature store service and click “Save”.

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
iris_features = fs.create_feature_group(name="iris_features", version=1, time_travel_format=None)
iris_features.save(iris_df3)
<hsfs.feature_group.FeatureGroup object at 0x7f09bc740ad0>
iris_labels_lookup = fs.create_feature_group(name="iris_labels_lookup", version=1, time_travel_format=None)
iris_labels_lookup.save(lookup_df)
<hsfs.feature_group.FeatureGroup object at 0x7f09bc67b9d0>

Read the Iris Training Dataset from the Feature Store

iris_features = fs.get_feature_group("iris_features", 1)
df = iris_features.read().toPandas()
df.head(10)
   sepal_length  sepal_width  petal_width  petal_length  label
0           5.1          3.5          0.2           1.4      0
1           4.9          3.0          0.2           1.4      0
2           4.7          3.2          0.2           1.3      0
3           4.6          3.1          0.2           1.5      0
4           5.0          3.6          0.2           1.4      0
5           5.4          3.9          0.4           1.7      0
6           4.6          3.4          0.3           1.4      0
7           5.0          3.4          0.2           1.5      0
8           4.4          2.9          0.2           1.4      0
9           4.9          3.1          0.1           1.5      0
df.describe()
       sepal_length  sepal_width  petal_width  petal_length       label
count    150.000000   150.000000   150.000000    150.000000  150.000000
mean       5.843333     3.057333     1.199333      3.758000    1.000000
std        0.828066     0.435866     0.762238      1.765298    0.819232
min        4.300000     2.000000     0.100000      1.000000    0.000000
25%        5.100000     2.800000     0.300000      1.600000    0.000000
50%        5.800000     3.000000     1.300000      4.350000    1.000000
75%        6.400000     3.300000     1.800000      5.100000    2.000000
max        7.900000     4.400000     2.500000      6.900000    2.000000
x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = df[["label"]]
X = x_df.values
y = y_df.values.ravel()

Train an Iris Flower Classifier

IRIS_FLOWER_RESOURCES_PATH = "/Resources/iris_flower"
IRIS_FLOWER_MODEL_PKL = "iris_knn.pkl"

Generate a KNN Model using the Feature Data

neighbors = random.randint(3, 30)
iris_knn = KNeighborsClassifier(n_neighbors=neighbors)
iris_knn.fit(X, y)
y_pred = iris_knn.predict(X)
acc = accuracy_score(y, y_pred)
print(acc)
0.98

Save the Trained Model to HopsFS

joblib.dump(iris_knn, IRIS_FLOWER_MODEL_PKL)
hdfs.mkdir(IRIS_FLOWER_RESOURCES_PATH)
hdfs.copy_to_hdfs(IRIS_FLOWER_MODEL_PKL, IRIS_FLOWER_RESOURCES_PATH, overwrite=True)
Started copying local path iris_knn.pkl to hdfs path hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000//Resources/iris_flower/iris_knn.pkl

Finished copying

Export the Trained Model to the Models Repository

It is a best-practice to put the python script that loads and run the model together with the trained model files. Below is the code for copying the script into the iris flower resources folder, which will be used when exporting the model.

IRIS_FLOWER_CLASSIFIER_SCRIPT = "iris_flower_classifier.py"

script_path = "Jupyter/end_to_end_pipeline/sklearn/" + IRIS_FLOWER_CLASSIFIER_SCRIPT
hdfs.cp(script_path, IRIS_FLOWER_RESOURCES_PATH + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT, overwrite=True)
MODEL_NAME = "irisflowerclassifier"

model.export(IRIS_FLOWER_RESOURCES_PATH, MODEL_NAME, metrics={'accuracy': acc})
Exported model irisflowerclassifier as version 1 successfully.
Polling irisflowerclassifier version 1 for model availability.
get model:/hopsworks-api/api/project/120/models/irisflowerclassifier_1?filter_by=endpoint_id:120
Model now available.
for p in hdfs.ls("Models/" + MODEL_NAME, recursive=True):
    print(p)
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_knn.pkl
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_knn.pkl
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/program.ipynb

Serve the Trained Model

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the Predict class and the methods predict, classify and regress, like this:

from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/irisflowerclassifier/1/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"

Then upload this python script to some folder in your project and go to the “Model Serving” service in Hopsworks:

sklearn_serving1.png

Then click on “create serving” and configure your serving:

sklearn_serving2.png

Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.

sklearn_serving3.png

Query Model Repository for best IrisFlowerClassifier Model

from hops.model import Metric
EVALUATION_METRIC="accuracy"

best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)

print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])
Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using serving.create_or_update()

# Create serving
SERVING_NAME = MODEL_NAME
SCRIPT_PATH = "Models/" + MODEL_NAME + "/" + str(best_model['version']) + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT

serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                         SCRIPT_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed
                         topic_name="CREATE", # topic name or CREATE to create a new topic for inference logging, otherwise NONE
                         instances=1 # number of replicas
                         )
Inferring model server from artifact files: FLASK
Creating serving irisflowerclassifier for artifact /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py ...
Serving irisflowerclassifier successfully created
for s in serving.get_all():
    print(s.name)
irisflowerclassifier

After the serving have been created, you can find it in the Hopsworks UI by going to the “Model Serving” tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like:

  • get_servings()
  • get_serving_id(serving_name)
  • get_serving_model_path(serving_name)
  • get_serving_type(serving_name)
  • get_serving_version(serving_name)
  • get_serving_kafka_topic(serving_name)
  • get_serving_status(serving_name)
  • exist(serving_name)
print("Info: \tid: {},\n \
       model_path: {},\n \
       model_version: {},\n \
       artifact_version: {},\n \
       model_server: {},\n \
       serving_tool: {}".format(
    serving.get_id(SERVING_NAME),
    serving.get_model_path(SERVING_NAME),
    serving.get_model_version(SERVING_NAME),
    serving.get_artifact_version(SERVING_NAME),
    serving.get_model_server(SERVING_NAME),
    serving.get_serving_tool(SERVING_NAME)))
Info:   id: 788,
        model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py,
        model_version: 1,
        artifact_version: 0,
        model_server: FLASK,
        serving_tool: DEFAULT

Or describe a serving using - describe(serving_name)

serving.describe(SERVING_NAME)
id: 788, name: irisflowerclassifier, model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py, model_version: 1, artifact_version: 0, transformer: None, model_server: FLASK, serving_tool: DEFAULT, requested_instances: 1, available_instances: 0, available_transformer_instances: 0, predictor_resource_config: {'cores': 1, 'memory': 1024, 'gpus': 0}, creator: Admin Admin, created: 2021-07-07T09:38:40Z, status: Stopped, kafka_topic_dto: <hops.kafka.KafkaTopicDTO object at 0x7f09bc740710>

Classify Iris Flowers using the Trained Model

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

if serving.get_status(SERVING_NAME) == "Running":
    serving.stop(SERVING_NAME)
time.sleep(10)

Start new serving

serving.start(SERVING_NAME)
Starting serving with name: irisflowerclassifier...
Serving with name: irisflowerclassifier successfully started

Wait until serving is up and running

while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(15)

Send Prediction Requests to the Served Model using Hopsworks REST API

For making inference requests you can use the utility method serving.make_inference_request

NUM_FEATURES = 4

for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}

Monitor Prediction Logs

Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

Setup a Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read the lookup table from the Feature Store for converting numerical labels to categorical

iris_labels_lookup = fs.get_feature_group("iris_labels_lookup", 1)
iris_labels_lookup_df = iris_labels_lookup.read().toPandas()

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 5):
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            
            print("serving: {}, version: {}, timestamp: {},\n"\
                  "         http_response_code: {}, model_server: {}, serving_tool: {}".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["responseHttpCode"],
                       event_dict["modelServer"],
                       event_dict["servingTool"]))
            
            if PRINT_INSTANCES:
                print("instances: {}\n".format(event_dict["inferenceRequest"]))
            if PRINT_PREDICTIONS:
                prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
                prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 'variety'].iloc[0]
                print("predictions: {}, prediction_label:{}\n".format(prediction, prediction_label))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
serving: irisflowerclassifier, version: 1, timestamp: 1625650784109,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 1, prediction_label:Versicolor

serving: irisflowerclassifier, version: 1, timestamp: 1625650784277,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica

serving: irisflowerclassifier, version: 1, timestamp: 1625650784416,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 1, prediction_label:Versicolor

serving: irisflowerclassifier, version: 1, timestamp: 1625650784596,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 1, prediction_label:Versicolor

serving: irisflowerclassifier, version: 1, timestamp: 1625650784732,
         http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica