Model Serving with KFServing and Tensorflow - MNIST Classification

Model Serving with KFServing and Tensorflow - MNIST Classification


INPUT –> MODEL –> PREDICTION

This notebook requires KFServing

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()

MODEL_NAME="mnist"
EVALUATION_METRIC="accuracy"
Connected. Call `.close()` to terminate connection gracefully.
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist
Model version: 1
{'accuracy': '0.71875'}

Create Model Serving of Exported Model

from hops import serving
# Create serving instance
SERVING_NAME = MODEL_NAME

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    best_model.model_path, model_version=best_model.version, # set the path and version of the model to be deployed
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
Inferring model server from artifact files: TENSORFLOW_SERVING
Creating serving mnist for artifact /Projects/demo_ml_meb10000//Models/mnist ...
Serving mnist successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mnist
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
Starting serving with name: mnist...
Serving with name: mnist successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(10)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.0218618922, 0.060441982, 0.0637693629, 0.0953115523, 0.268337429, 0.011644518, 0.320831656, 0.0623592362, 0.0468038693, 0.048638504]]}
{'predictions': [[0.0128081683, 0.058361087, 0.0519089513, 0.0680933073, 0.101529188, 0.00917466916, 0.49870196, 0.124165297, 0.0234098844, 0.0518474728]]}
{'predictions': [[0.0169027131, 0.0688994601, 0.0502710305, 0.127270445, 0.0928574502, 0.010248892, 0.482126236, 0.0942078382, 0.0259690043, 0.0312469937]]}
{'predictions': [[0.0257082824, 0.040854983, 0.0316030905, 0.203676596, 0.138357729, 0.00987543911, 0.397324562, 0.064534761, 0.0407368727, 0.0473276041]]}
{'predictions': [[0.0209536515, 0.075448446, 0.0355816707, 0.167488784, 0.0865107551, 0.013768537, 0.401376575, 0.130470008, 0.0300768502, 0.0383248404]]}
{'predictions': [[0.02752956, 0.0587611571, 0.0508604646, 0.154312283, 0.0760035962, 0.0129964901, 0.458516, 0.092619285, 0.0247991625, 0.0436020344]]}
{'predictions': [[0.0190246552, 0.115838125, 0.03548453, 0.138723165, 0.12200997, 0.012828338, 0.387132406, 0.109228708, 0.0272692535, 0.032460954]]}
{'predictions': [[0.0295567568, 0.0489571691, 0.0415432379, 0.21964623, 0.107769594, 0.0110459328, 0.347042203, 0.142159656, 0.0240610894, 0.0282180831]]}
{'predictions': [[0.0125460578, 0.0528946444, 0.0296274871, 0.183094755, 0.134415448, 0.0141807096, 0.39737317, 0.0879673287, 0.0296478942, 0.0582524166]]}
{'predictions': [[0.0413516127, 0.0641062856, 0.0435722917, 0.159566402, 0.139954373, 0.0291613769, 0.276411712, 0.115447871, 0.0559384301, 0.0744897127]]}

Monitor Prediction Requests and Responses using Kafka

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 1036, modelName: mnist, modelVersion: 1,requestTimestamp: 1634307936, inferenceId:d761be39-4f35-462f-8da9-bb1359adc2bc, messageType:response
Predictions -> [[0.0128081683, 0.058361087, 0.0519089513, 0.0680933073, 0.101529188, 0.00917466916, 0.49870196, 0.124165297, 0.0234098844, 0.0518474728]]

INFO -> servingId: 1036, modelName: mnist, modelVersion: 1,requestTimestamp: 1634307937, inferenceId:dd8c8cb7-b6ec-4142-a108-a4c8058b7584, messageType:response
Predictions -> [[0.0169027131, 0.0688994601, 0.0502710305, 0.127270445, 0.0928574502, 0.010248892, 0.482126236, 0.0942078382, 0.0259690043, 0.0312469937]]

INFO -> servingId: 1036, modelName: mnist, modelVersion: 1,requestTimestamp: 1634307937, inferenceId:3b98b0e2-4b98-4528-b162-d8a2f0dd97f1, messageType:response
Predictions -> [[0.0257082824, 0.040854983, 0.0316030905, 0.203676596, 0.138357729, 0.00987543911, 0.397324562, 0.064534761, 0.0407368727, 0.0473276041]]

INFO -> servingId: 1036, modelName: mnist, modelVersion: 1,requestTimestamp: 1634307937, inferenceId:cd6ac413-0062-4668-9f9d-63daa08948e2, messageType:response
Predictions -> [[0.0209536515, 0.075448446, 0.0355816707, 0.167488784, 0.0865107551, 0.013768537, 0.401376575, 0.130470008, 0.0300768502, 0.0383248404]]

INFO -> servingId: 1036, modelName: mnist, modelVersion: 1,requestTimestamp: 1634307937, inferenceId:f113fb80-16df-471e-b149-9ade8c9cfc53, messageType:response
Predictions -> [[0.02752956, 0.0587611571, 0.0508604646, 0.154312283, 0.0760035962, 0.0129964901, 0.458516, 0.092619285, 0.0247991625, 0.0436020344]]