Model Serving with KFServing, Transformers and Tensorflow - MNIST Classification

Model Serving with KFServing, Transformers and Tensorflow - MNIST Classification


INPUT –> TRANSFORMER –> ENRICHED INPUT –> MODEL –> PREDICTION

This notebook requires KFServing

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()

MODEL_NAME="mnist"
EVALUATION_METRIC="accuracy"
Connected. Call `.close()` to terminate connection gracefully.
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist
Model version: 1
{'accuracy': '0.71875'}

Serve the Trained Model with a Transformer

To serve a model with a transformer, write a python script that implements the Transformer class and the methods preprocess and postprocess, like this:

class Transformer(object):
    def __init__(self):
        print("[Transformer] Initializing...")
        # Initialization code goes here

    def preprocess(self, inputs):
        # Transform the request inputs here. The object returned by this method will be used as model input.
        return inputs

    def postprocess(self, outputs):
        # Transform the predictions computed by the model before returning a response.
        return outputs
from hops import serving
from hops import hdfs
# Create serving instance
SERVING_NAME = MODEL_NAME
TRANSFORMER_PATH=hdfs.project_path() + "/Jupyter/serving/kfserving/tensorflow/transformer.py" # or .ipynb

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    best_model.model_path, model_version=best_model.version, # set the path and version of the model to be deployed
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    transformer=TRANSFORMER_PATH, 
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    transformer_instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
Inferring model server from artifact files: TENSORFLOW_SERVING
Creating serving mnist for artifact /Projects/demo_ml_meb10000//Models/mnist ...
Serving mnist successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mnist
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
Starting serving with name: mnist...
Serving with name: mnist successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(10)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.0137021886, 0.0677420273, 0.0292149764, 0.191990316, 0.127408266, 0.00461552059, 0.367597848, 0.113464594, 0.0338007659, 0.0504634716]]}
{'predictions': [[0.0189599581, 0.0472099446, 0.0424997695, 0.187853515, 0.150768891, 0.0110897673, 0.412878215, 0.0501961894, 0.0413965173, 0.0371471234]]}
{'predictions': [[0.0112110134, 0.0727639943, 0.0558238067, 0.135703281, 0.135276303, 0.00986007415, 0.41217351, 0.0851974338, 0.0288695, 0.0531211831]]}
{'predictions': [[0.0356102847, 0.0747722313, 0.0763402805, 0.135825962, 0.1375902, 0.0124450168, 0.373686343, 0.0581079796, 0.0473188423, 0.0483028106]]}
{'predictions': [[0.0185755845, 0.0853011534, 0.0644860417, 0.195167631, 0.154377118, 0.016472118, 0.288549781, 0.106394552, 0.0351197869, 0.035556335]]}
{'predictions': [[0.0233460106, 0.0540076382, 0.040135853, 0.195226252, 0.0804001316, 0.0169349462, 0.452840418, 0.0811912417, 0.0321122631, 0.0238052]]}
{'predictions': [[0.0114567261, 0.10346631, 0.0741356909, 0.206764832, 0.100969762, 0.0105429776, 0.33957383, 0.0910645, 0.039491456, 0.022533888]]}
{'predictions': [[0.0128288604, 0.0484299064, 0.0665263087, 0.144729093, 0.100146264, 0.0108449422, 0.471181333, 0.0990689173, 0.0180417895, 0.0282026958]]}
{'predictions': [[0.0161445029, 0.0610838085, 0.0999077857, 0.105673127, 0.140954942, 0.00983355, 0.444789171, 0.0534957275, 0.0368245468, 0.0312928334]]}
{'predictions': [[0.0139131062, 0.0619532727, 0.0521616861, 0.162161961, 0.118194342, 0.0180470552, 0.411713928, 0.0898847878, 0.032136444, 0.0398333706]]}

Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
timeout.. no more messages to read from topic
INFO -> servingId: 2049, modelName: mnist, modelVersion: 1,requestTimestamp: 1634309496, inferenceId:baf90231-c76f-456b-a31d-4a1735dbb63e, messageType:response
Predictions -> [[0.0137021886, 0.0677420273, 0.0292149764, 0.191990316, 0.127408266, 0.00461552059, 0.367597848, 0.113464594, 0.0338007659, 0.0504634716]]

INFO -> servingId: 2049, modelName: mnist, modelVersion: 1,requestTimestamp: 1634309497, inferenceId:912e660e-f8f9-4437-8e4e-296795a5d447, messageType:response
Predictions -> [[0.0189599581, 0.0472099446, 0.0424997695, 0.187853515, 0.150768891, 0.0110897673, 0.412878215, 0.0501961894, 0.0413965173, 0.0371471234]]

INFO -> servingId: 2049, modelName: mnist, modelVersion: 1,requestTimestamp: 1634309497, inferenceId:b9ecfaa8-65b9-4b03-85c0-daa4a2887811, messageType:response
Predictions -> [[0.0112110134, 0.0727639943, 0.0558238067, 0.135703281, 0.135276303, 0.00986007415, 0.41217351, 0.0851974338, 0.0288695, 0.0531211831]]

INFO -> servingId: 2049, modelName: mnist, modelVersion: 1,requestTimestamp: 1634309497, inferenceId:e0657b0f-c38d-490e-b60f-8f6054797609, messageType:response
Predictions -> [[0.0356102847, 0.0747722313, 0.0763402805, 0.135825962, 0.1375902, 0.0124450168, 0.373686343, 0.0581079796, 0.0473188423, 0.0483028106]]