Model Serving with KFServing and Tensorflow - MNIST Classification

Model Serving with KFServing and Tensorflow - MNIST Classification


INPUT –> MODEL –> PREDICTION

This notebook requires KFServing to be installed

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()

MODEL_NAME="mnist_e2e"
EVALUATION_METRIC="accuracy"
Connected. Call `.close()` to terminate connection gracefully.
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist_e2e
Model version: 1
{'accuracy': '0.625'}

Create Model Serving of Exported Model

from hops import serving
# Create serving instance
SERVING_NAME = "mniste2ekf"

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    model_path=best_model.model_path, # set the path of the model to be deployed
                                    model_server="TENSORFLOW_SERVING", # set the model server to run the model
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    # optional arguments
                                    model_version=best_model.version, # set the version of the model to be deployed
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
2022-01-28 11:52:48,685 INFO: Serving mniste2ekf successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mniste2ekf
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
2022-01-28 11:52:49,378 INFO: Serving with name: mniste2ekf successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(10)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.00228863931, 0.039052885, 0.299541056, 0.0292527322, 0.210720837, 0.00816287566, 0.0126433428, 0.0109003522, 0.344060391, 0.0433768257]]}
{'predictions': [[0.00586040691, 0.0307841301, 0.33769837, 0.0337129459, 0.254304558, 0.0237869881, 0.0141318291, 0.0193788894, 0.238279134, 0.0420627445]]}
{'predictions': [[0.00538377091, 0.061361514, 0.238303721, 0.0285069272, 0.501042247, 0.0104929302, 0.0204542708, 0.0179179348, 0.0851745605, 0.0313620083]]}
{'predictions': [[0.00573793799, 0.029261997, 0.369869381, 0.0178442206, 0.450838357, 0.00611973461, 0.00844837539, 0.0112320511, 0.079323791, 0.0213241577]]}
{'predictions': [[0.0119106388, 0.0629702583, 0.153047368, 0.0336705893, 0.3950378, 0.00748524442, 0.0267426614, 0.0312577672, 0.249878481, 0.0279991794]]}
{'predictions': [[0.0111142406, 0.0491779149, 0.204911619, 0.0337088816, 0.241256326, 0.0222033393, 0.0126153091, 0.0414450057, 0.302034378, 0.0815329328]]}
{'predictions': [[0.00563503848, 0.0279151835, 0.28025192, 0.0385764167, 0.363024831, 0.0126688043, 0.0207744408, 0.0177768413, 0.193134412, 0.0402421467]]}
{'predictions': [[0.00543945516, 0.126275226, 0.163412824, 0.04192359, 0.265003413, 0.0151998475, 0.0343197025, 0.0221705418, 0.241572261, 0.0846831948]]}
{'predictions': [[0.00477044843, 0.0468725786, 0.208328143, 0.0335769281, 0.347728401, 0.0162629858, 0.0180092286, 0.01045975, 0.283724517, 0.0302670337]]}
{'predictions': [[0.00746631622, 0.0523283072, 0.304017246, 0.027453661, 0.280292451, 0.0179669447, 0.0197735652, 0.0237298273, 0.174194977, 0.0927767605]]}

Monitor Prediction Requests and Responses using Kafka

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:8fe0ab82-a9fd-424e-b154-b88eaeb82312, messageType:response
Predictions -> [[0.00531423138, 0.0495751873, 0.215446427, 0.0254741, 0.456469536, 0.00679346081, 0.0227076765, 0.0156853292, 0.151415035, 0.0511190034]]

INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:2eb8f1b7-5fe5-4a15-921e-1077b3bc5548, messageType:response
Predictions -> [[0.015544543, 0.04791224, 0.231773868, 0.0455554053, 0.352704018, 0.0196769126, 0.023360543, 0.0201557297, 0.190379784, 0.0529369563]]

INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:2a85e0ed-70a4-43b3-8afd-43fdb5bb888c, messageType:response
Predictions -> [[0.0119756339, 0.108432077, 0.249693975, 0.0291424617, 0.297212154, 0.0278416723, 0.0237392467, 0.0305440109, 0.182935417, 0.038483344]]

INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:92f62e3a-f05e-442f-91c2-7f8de7d80de9, messageType:response
Predictions -> [[0.0061565917, 0.0573133416, 0.157412216, 0.0603140071, 0.319210559, 0.01605873, 0.018614579, 0.0371795483, 0.204262689, 0.123477831]]

INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370800, inferenceId:4c50013c-834d-4704-9808-6a6cb25dbb95, messageType:response
Predictions -> [[0.011226072, 0.0487205796, 0.279118955, 0.0241690688, 0.347856402, 0.0182868, 0.0276911054, 0.0257906299, 0.145419195, 0.0717211664]]