Model Serving with KFServing and Tensorflow - MNIST Classification

Model Serving with KFServing and Tensorflow - MNIST Classification


INPUT –> MODEL –> PREDICTION

This notebook requires KFServing

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

from hops import model
from hops.model import Metric
MODEL_NAME="mnist"
EVALUATION_METRIC="accuracy"
best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)
print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])
Model name: mnist
Model version: 1
{'accuracy': '0.75'}

Create Model Serving of Exported Model

from hops import serving
# Create serving instance
SERVING_NAME = MODEL_NAME
MODEL_PATH="/Models/" + best_model['name']

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    MODEL_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
Inferring model server from artifact files: TENSORFLOW_SERVING
Creating serving mnist for artifact /Projects/demo_ml_meb10000//Models/mnist ...
Serving mnist successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mnist
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
Starting serving with name: mnist...
Serving with name: mnist successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(5)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.0397434309, 0.0627049282, 0.0516082384, 0.0257312227, 0.360617578, 0.12559171, 0.102420703, 0.0631940439, 0.123033032, 0.0453550518]]}
{'predictions': [[0.0603190921, 0.0537713878, 0.0616007112, 0.0458610356, 0.321380526, 0.200304031, 0.0421636887, 0.0574198663, 0.0928280652, 0.0643516]]}
{'predictions': [[0.0548233502, 0.047754433, 0.0511284024, 0.0422567539, 0.324842304, 0.185612082, 0.060034167, 0.068407923, 0.103621393, 0.0615192093]]}
{'predictions': [[0.0422211885, 0.0751092881, 0.0469165482, 0.038731005, 0.385032415, 0.176647171, 0.0407903641, 0.0397685617, 0.0769609958, 0.0778225]]}
{'predictions': [[0.0613800436, 0.0483597815, 0.073576726, 0.0258731693, 0.3094998, 0.094138369, 0.0458293408, 0.088566117, 0.107787244, 0.144989386]]}
{'predictions': [[0.072254248, 0.0885431543, 0.0433586352, 0.0398325, 0.265434831, 0.181497604, 0.0689154416, 0.0554167852, 0.0857768059, 0.0989700779]]}
{'predictions': [[0.0763806552, 0.0438941717, 0.0335087031, 0.0202852022, 0.34188053, 0.127577499, 0.103079036, 0.0814603046, 0.116382934, 0.055550985]]}
{'predictions': [[0.0522790626, 0.104057834, 0.0571820699, 0.0429950953, 0.254568487, 0.137584299, 0.0513742864, 0.0879343525, 0.0830356777, 0.128988877]]}
{'predictions': [[0.0439827815, 0.0486454293, 0.0546759665, 0.0406821, 0.296878666, 0.22346729, 0.0523542352, 0.100808315, 0.0766454265, 0.0618598461]]}
{'predictions': [[0.0300273038, 0.0879570097, 0.0512411222, 0.0531735569, 0.256210625, 0.16444467, 0.0763468072, 0.0823352486, 0.0803362, 0.117927447]]}

Monitor Prediction Requests and Responses using Kafka

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=1)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 17, modelName: mnist, modelVersion: 1,requestTimestamp: 1623765942, inferenceId:a4014a8e-d9c7-4fa4-9acb-78f2f8e3de43, messageType:response
Predictions -> [[0.0613800436, 0.0483597815, 0.073576726, 0.0258731693, 0.3094998, 0.094138369, 0.0458293408, 0.088566117, 0.107787244, 0.144989386]]

INFO -> servingId: 17, modelName: mnist, modelVersion: 1,requestTimestamp: 1623765942, inferenceId:080e11c7-bf8c-4034-adf5-acc2598ea606, messageType:response
Predictions -> [[0.072254248, 0.0885431543, 0.0433586352, 0.0398325, 0.265434831, 0.181497604, 0.0689154416, 0.0554167852, 0.0857768059, 0.0989700779]]

INFO -> servingId: 17, modelName: mnist, modelVersion: 1,requestTimestamp: 1623765942, inferenceId:0782a4ea-489f-4fad-b0a1-0b67484d4bba, messageType:response
Predictions -> [[0.0763806552, 0.0438941717, 0.0335087031, 0.0202852022, 0.34188053, 0.127577499, 0.103079036, 0.0814603046, 0.116382934, 0.055550985]]

INFO -> servingId: 17, modelName: mnist, modelVersion: 1,requestTimestamp: 1623765943, inferenceId:a32e540f-ffca-4cd9-99eb-3f6bba88260a, messageType:response
Predictions -> [[0.0522790626, 0.104057834, 0.0571820699, 0.0429950953, 0.254568487, 0.137584299, 0.0513742864, 0.0879343525, 0.0830356777, 0.128988877]]

INFO -> servingId: 17, modelName: mnist, modelVersion: 1,requestTimestamp: 1623765943, inferenceId:99698ce0-c8ef-40c6-88fd-25cd9da72050, messageType:response
Predictions -> [[0.0439827815, 0.0486454293, 0.0546759665, 0.0406821, 0.296878666, 0.22346729, 0.0523542352, 0.100808315, 0.0766454265, 0.0618598461]]