Scikit-Learn End-to-End Example - Iris

Iris Flower Classification and Serving Using SkLearn, HopsML, and the Hopsworks Feature Store

In this notebook we will,

  1. Load the Iris Flower dataset from HopsFS
  2. Do feature engineering on the dataset
  3. Save the features to the feature store
  4. Read the feature data from the feature store
  5. Train a KNN Model using SkLearn
  6. Save the trained model to HopsFS
  7. Launch a serving instance to serve the trained model
  8. Send some prediction requests to the served model
  9. Monitor the predictions through Kafka

Imports

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, serving, model, tls
from confluent_kafka import Producer, Consumer, KafkaError
import random
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
6application_1617283456172_0007pysparkidleLinkLink
SparkSession available as 'spark'.

Load Dataset

project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
    project_path + "TourData/iris/iris.csv")
iris_df.printSchema()
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- variety: string (nullable = true)

Feature Engineering

The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the variety column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.

encoder = StringIndexer(inputCol="variety", outputCol="label")
fit_model = encoder.fit(iris_df)
iris_df1 = fit_model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
iris_df3.printSchema()
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- label: integer (nullable = true)
iris_df3.show(5)
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    2|
|         4.9|        3.0|         1.4|        0.2|    2|
|         4.7|        3.2|         1.3|        0.2|    2|
|         4.6|        3.1|         1.5|        0.2|    2|
|         5.0|        3.6|         1.4|        0.2|    2|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows
lookup_df.show(3)
+----------+-----+
|   variety|label|
+----------+-----+
| Virginica|  0.0|
|Versicolor|  1.0|
|    Setosa|  2.0|
+----------+-----+

Save Features to the Feature Store

We can save two feature groups (hive tables), one called iris_features that contains the iris features and the corresponding numeric label, and another feature group called iris_labels_lookup for converting the numeric iris label back to categorical.

Note: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the “Settings” tab in your project, select the feature store service and click “Save”.

import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
iris_features = fs.create_feature_group(name="iris_features", version=1, time_travel_format=None)
iris_features.save(iris_df3)
<hsfs.feature_group.FeatureGroup object at 0x7f120010ba90>
iris_labels_lookup = fs.create_feature_group(name="iris_labels_lookup", version=1, time_travel_format=None)
iris_labels_lookup.save(lookup_df)
<hsfs.feature_group.FeatureGroup object at 0x7f120012e1d0>

Read the Iris Training Dataset from the Feature Store

iris_features = fs.get_feature_group("iris_features", 1)
df = iris_features.read().toPandas()
df.head(10)
   sepal_length  sepal_width  petal_width  petal_length  label
0           5.1          3.5          0.2           1.4      2
1           4.9          3.0          0.2           1.4      2
2           4.7          3.2          0.2           1.3      2
3           4.6          3.1          0.2           1.5      2
4           5.0          3.6          0.2           1.4      2
5           5.4          3.9          0.4           1.7      2
6           4.6          3.4          0.3           1.4      2
7           5.0          3.4          0.2           1.5      2
8           4.4          2.9          0.2           1.4      2
9           4.9          3.1          0.1           1.5      2
df.describe()
       sepal_length  sepal_width  petal_width  petal_length       label
count    150.000000   150.000000   150.000000    150.000000  150.000000
mean       5.843333     3.057333     1.199333      3.758000    1.000000
std        0.828066     0.435866     0.762238      1.765298    0.819232
min        4.300000     2.000000     0.100000      1.000000    0.000000
25%        5.100000     2.800000     0.300000      1.600000    0.000000
50%        5.800000     3.000000     1.300000      4.350000    1.000000
75%        6.400000     3.300000     1.800000      5.100000    2.000000
max        7.900000     4.400000     2.500000      6.900000    2.000000
x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = df[["label"]]
X = x_df.values
y = y_df.values.ravel()

Train a KNN Model using the Feature Data

neighbors = random.randint(3, 30)
iris_knn = KNeighborsClassifier(n_neighbors=neighbors)
iris_knn.fit(X, y)
y_pred = iris_knn.predict(X)
acc = accuracy_score(y, y_pred)
print(acc)
0.98

Save the Trained Model to HopsFS

joblib.dump(iris_knn, "iris_knn.pkl")
hdfs.mkdir("Resources/sklearn_model")
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources/sklearn_model", overwrite=True)
Started copying local path iris_knn.pkl to hdfs path hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Resources/sklearn_model/iris_knn.pkl

Finished copying

Constants

Serve the Trained Model

To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the Predict class and the methods predict, classify and regress, like this:

from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/iris_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are not JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"

Then upload this python script to some folder in your project and go to the “Model Serving” service in Hopsworks:

sklearn_serving1.png

Then click on “create serving” and configure your serving:

sklearn_serving2.png

Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.

sklearn_serving3.png

It is a best-practice to put the script together with the trained model, below is the code for exporting the script from Jupyter/Serving/sklearn/iris_flower_classifier.py to Models/irisFlowerClassifier/1/iris_flower_classifier.py.

script_path = "Jupyter/Serving/sklearn/iris_flower_classifier.py"
hdfs.cp("Jupyter/End_To_End_Pipeline/sklearn/iris_flower_classifier.py", "Resources/sklearn_model/iris_flower_classifier.py", overwrite=True)

Export the Trained Model to the Models Repository

MODEL_NAME = "IrisFlowerClassifier"
model.export("Resources/sklearn_model", MODEL_NAME, metrics={'accuracy': acc})
Exported model IrisFlowerClassifier as version 1 successfully.
Polling IrisFlowerClassifier version 1 for model availability.
get model:/hopsworks-api/api/project/125/models/IrisFlowerClassifier_1?filter_by=endpoint_id:125
Model now available.
for p in hdfs.ls("Models/" + MODEL_NAME, recursive=True):
    print(p)
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_knn.pkl
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/program.ipynb

Query Model Repository for best IrisFlowerClassifier Model

EVALUATION_METRIC="accuracy"
from hops.model import Metric

best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)

print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])
Model name: IrisFlowerClassifier
Model version: 1
{'accuracy': '0.98'}

Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using serving.create_or_update()

# Create serving
serving_name = MODEL_NAME
script_path = "Models/" + MODEL_NAME + "/" + str(best_model['version']) + "/iris_flower_classifier.py"
serving.create_or_update(serving_name, script_path, model_version=best_model['version'], model_server="FLASK")
Creating serving IrisFlowerClassifier for artifact /Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py ...
Serving IrisFlowerClassifier successfully created

After the serving have been created, you can find it in the Hopsworks UI by going to the “Model Serving” tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like:

  • get_servings()
  • get_serving_id(serving_name)
  • get_serving_artifact_path(serving_name)
  • get_serving_type(serving_name)
  • get_serving_version(serving_name)
  • get_serving_kafka_topic(serving_name)
  • get_serving_status(serving_name)
  • exist(serving_name)
for s in serving.get_all():
    print(s.name)
IrisFlowerClassifier
serving.get_id(MODEL_NAME)
9
serving.get_artifact_path(MODEL_NAME)
'/Projects/demo_ml_meb10000/Models/IrisFlowerClassifier/1/iris_flower_classifier.py'
serving.get_model_server(MODEL_NAME)
'FLASK'
serving.get_serving_tool(MODEL_NAME)
'DEFAULT'
serving.get_version(MODEL_NAME)
1
serving.get_kafka_topic(MODEL_NAME)
'IrisFlowerClassifier-inf2074'
serving.get_status(MODEL_NAME)
'Stopped'

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

if serving.get_status(MODEL_NAME) == "Running":
    serving.stop(MODEL_NAME)
time.sleep(10)

Start new serving

serving.start(MODEL_NAME)
Starting serving with name: IrisFlowerClassifier...
Serving with name: IrisFlowerClassifier successfully started

Wait until serving is up and running

while serving.get_status(MODEL_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(15)

Send Prediction Requests to the Served Model using Hopsworks REST API

Constants

TOPIC_NAME = serving.get_kafka_topic(MODEL_NAME)
NUM_FEATURES = 4

For making inference requests you can use the utility method serving.make_inference_request

for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(MODEL_NAME, data)
    print(response)
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [1]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [0]}

Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for yourr model’s performance and its predictions in a scalable manner.

Setup Kafka Consumer and Subscribe to the Topic containing the Inference Logs
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)
Read Kafka Avro Schema From Hopsworks and setup an Avro Reader
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)
Read Lookup Table from the Feature Store for Converting Numerical Labels to Categorical
iris_labels_lookup = fs.get_feature_group("iris_labels_lookup", 1)
iris_labels_lookup_df = iris_labels_lookup.read().toPandas()
Read 10 Messages from the Kafka Topic, parse them with the Avro Schema and print the results
for i in range(0, 10):
    msg = consumer.poll(timeout=1.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
            prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 
                                                         'variety'].iloc[0]
            print("serving: {}, version: {}, timestamp: {},"\
                  "\nrequest: {},\nprediction:{}, prediction_label:{}, http_response_code: {},"\
                  " model_server: {}, serving_tool: {}\n".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceRequest"],
                       prediction,
                       prediction_label,
                       event_dict["responseHttpCode"],
                       event_dict["modelServer"],
                       event_dict["servingTool"]
            ))
        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
serving: IrisFlowerClassifier, version: 1, timestamp: 1617320082599,
request: {"inputs": [[2.6821998564049085, 3.1347181390893635, 4.1322824869673305, 7.664782802694536]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320082999,
request: {"inputs": [[4.385168444555928, 4.6666622629029275, 4.221137886915623, 7.136881856956582]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083197,
request: {"inputs": [[1.4671917800589505, 3.292041049413281, 2.6843134988434856, 1.7075522843514763]]},
prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083368,
request: {"inputs": [[2.132378619200442, 2.7227978968761213, 3.36357273541271, 6.953955052737287]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083482,
request: {"inputs": [[6.422035759974221, 1.3941244796178403, 5.795176814006259, 7.644451300594337]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083600,
request: {"inputs": [[2.1564223865426646, 7.031899327566185, 2.1543742854961163, 5.763795334071594]]},
prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320083741,
request: {"inputs": [[1.0107102970232713, 3.5829579964245886, 1.6719853403773581, 1.8988368274073077]]},
prediction:2, prediction_label:Setosa, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084025,
request: {"inputs": [[3.1602715794043754, 2.765328539315829, 4.768382860726245, 4.27978944541886]]},
prediction:0, prediction_label:Virginica, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084148,
request: {"inputs": [[3.7311398289041318, 1.4845084981610497, 3.2241708173955157, 1.9971560636775902]]},
prediction:1, prediction_label:Versicolor, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT

serving: IrisFlowerClassifier, version: 1, timestamp: 1617320084240,
request: {"inputs": [[5.5664113317734625, 1.1243826526542673, 2.2816725254317394, 4.297925101811321]]},
prediction:1, prediction_label:Versicolor, http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT