Model Serving with KFServing, Scikit-learn and Predictors - Iris Flower Classification

Model Serving with KFServing, Scikit-learn and Predictors - Iris Flower Classification


INPUT –> PREDICTOR (MODEL) –> PREDICTION

This notebook requires KFServing to be installed

NOTE: It is assumed that a model called irisflowerclassifier is already available in Hopsworks. An example of training a model for the Iris flower classification problem is available in Jupyter/end_to_end_pipelines/sklearn/end_to_end_sklearn.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Using the hdfs module

The hdfs module provides a method to get the path in HopsFS where your data is stored, namely by calling hdfs.project_path(). The path resolves to the root path for your project, which is the view that you see when you click Data Sets in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be hdfs.project_path() + 'Resources/mnist'

# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
from hops import hdfs
# Uploading the traied model to hdfs
hdfs.copy_to_hdfs("iris_flower_knn.pkl", "Resources/iris_flower", overwrite=True)

# Downloading the iris flower model to the current working directory
iris_flower_model_hdfs_path = hdfs.project_path() + "Resources/iris_flower/iris_flower_knn.pkl"
local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)

Serve the Iris Flower classifier

Predictor script

To serve a Scikit-Learn Model, write a predictor script in Python that downloads the HDFS model in the constructor and saves it as a class variable and then implements the Predict class and the methods predict, classify and regress, like this:

from sklearn.externals import joblib
from hops import hdfs
import os

class Predict(object):

    def __init__(self):
        """ Initializes the serving state, reads a trained model from HDFS"""
        self.model_path = "Models/irisflowerclassifier/1/iris_flower_knn.pkl"
        print("Copying SKLearn model from HDFS to local directory")
        hdfs.copy_to_local(self.model_path)
        print("Reading local SkLearn model for serving")
        self.model = joblib.load("./iris_flower_knn.pkl")
        print("Initialization Complete")


    def predict(self, inputs):
        """ Serves a prediction request usign a trained model"""
        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable

    def classify(self, inputs):
        """ Serves a classification request using a trained model"""
        return "not implemented"

    def regress(self, inputs):
        """ Serves a regression request using a trained model"""
        return "not implemented"

Then, if you are using Jupyter, keep track of the path where you created the predictor script. Otherweise, upload the script to some folder in your project using the UI or the hops Python library.

Query Model Registry for best Iris Flower Classifier Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()
Connected. Call `.close()` to terminate connection gracefully.
MODEL_NAME = "irisflowerclassifier"
EVALUATION_METRIC="accuracy"

best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")

print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}

Create Model Serving of Exported Model

Once all the files are in the file system, we can create a serving instance that points to the model files using serving.create_or_update()

from hops import serving
from hops import hdfs
# Create serving
SERVING_NAME = MODEL_NAME

SCRIPT_PATH = hdfs.project_path() + "/Jupyter/serving/kfserving/python/predictor.py" # or .ipynb

serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                         model_path=best_model.model_path, # set the path of the model to be deployed
                         model_server="PYTHON", # set the model server to run the model
                         kfserving=True, # enable KFServing
                         predictor=SCRIPT_PATH, # set the predictor to load the model and make predictions
                         # optional arguments
                         model_version=best_model.version, # set the version of the model to be deployed
                         topic_name="CREATE", # topic name or CREATE to create a new topic for inference logging, otherwise NONE
                         instances=1 # number of replicas
                         )
2022-01-17 15:53:39,107 INFO: Serving irisflowerclassifier successfully created

Once the serving instance is created, it will be shown in the “Model Serving” tab in the Hopsworks UI. You can view detailed information like server-logs and which Kafka Topic it is logging inference requests to.

kfserving_python_predictor_details

You can also use the Python module to query the Hopsworks REST API about information on the existing servings using methods like:

  • get_all()
  • get_id(serving_name)
  • get_model_path(serving_name)
  • get_model_version(serving_name)
  • get_artifact_version(serving_name)
  • get_kafka_topic(serving_name)
  • ...
print("Info: \tid: {},\n \
       model_path: {},\n \
       model_version: {},\n \
       artifact_version: {},\n \
       predictor: {},\n \
       model_server: {},\n \
       serving_tool: {}".format(
    serving.get_id(SERVING_NAME),
    serving.get_model_path(SERVING_NAME),
    serving.get_model_version(SERVING_NAME),
    serving.get_artifact_version(SERVING_NAME),
    serving.get_predictor(SERVING_NAME),
    serving.get_model_server(SERVING_NAME),
    serving.get_serving_tool(SERVING_NAME)))
Info:   id: 1056,
        model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier,
        model_version: 1,
        artifact_version: 3,
        predictor: predictor.py,
        model_server: PYTHON,
        serving_tool: KFSERVING
for s in serving.get_all():
    print(s.name)
irisflowerclassifier

Classify flowers with the Iris Flower classifier

Start Model Serving Server

You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below

Shut down currently running serving

import time
if serving.get_status(SERVING_NAME) == "Running":
    serving.stop(SERVING_NAME)
time.sleep(10)

Start new serving

serving.start(SERVING_NAME)
2022-01-12 13:33:42,454 INFO: Serving with name: irisflowerclassifier successfully started

Wait until serving is up and running

while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(15)

Check the Server Logs

You can access the server logs using Kibana by clicking on the ‘Show logs’ button in the action bar, and filter them using fields such as serving component (i.e., predictor or transformer) or container name among other things.

kfserving_python_predictor_logs

Send Prediction Requests to the Served Model using Hopsworks REST API

For making inference requests you can use the utility method serving.make_inference_request

import json
import random

NUM_FEATURES = 4

for i in range(20):
    data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [1]}
{'predictions': [2]}

Monitor Prediction Logs

Consume Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup a Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 2, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641994517, inferenceId:5ebd3ced-8b6a-44ea-aafc-c5d6858e2c28, messageType:response
Predictions -> [2]

INFO -> servingId: 2, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641994517, inferenceId:7aaba95c-77fd-418d-9963-0d2592f3cd7e, messageType:response
Predictions -> [0]

INFO -> servingId: 2, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641994517, inferenceId:948f0f67-a5b5-4775-8b8a-ad8f265c294b, messageType:response
Predictions -> [1]

INFO -> servingId: 2, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641994517, inferenceId:d8c1ab25-68a3-4d63-9721-7abb44432ab5, messageType:response
Predictions -> [2]

INFO -> servingId: 2, modelName: irisflowerclassifier, modelVersion: 1,requestTimestamp: 1641994518, inferenceId:baef3bae-aa11-4dfe-a752-997a94d9d07f, messageType:response
Predictions -> [2]