Model Serving with KFServing, Tensorflow and Transformers - MNIST Classification

Model Serving with KFServing, Tensorflow and Transformers - MNIST Classification


INPUT –> TRANSFORMER –> ENRICHED INPUT –> MODEL –> PREDICTION

This notebook requires KFServing to be installed

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()

MODEL_NAME="mnist_e2e"
EVALUATION_METRIC="accuracy"
Connected. Call `.close()` to terminate connection gracefully.
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist_e2e
Model version: 1
{'accuracy': '0.625'}

Serve the Trained Model with a Transformer

To serve a model with a transformer, write a python script that implements the Transformer class and the methods preprocess and postprocess, like this:

class Transformer(object):
    def __init__(self):
        print("[Transformer] Initializing...")
        # Initialization code goes here

    def preprocess(self, inputs):
        # Transform the request inputs here. The object returned by this method will be used as model input.
        return inputs

    def postprocess(self, outputs):
        # Transform the predictions computed by the model before returning a response.
        return outputs
from hops import serving
from hops import hdfs
# Create serving instance
SERVING_NAME = "mniste2ekftransformer"

TRANSFORMER_PATH=hdfs.project_path() + "/Jupyter/serving/kfserving/tensorflow/transformer.py" # or .ipynb

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    model_path=best_model.model_path, # set the path of the model to be deployed
                                    model_server="TENSORFLOW_SERVING", # set the model server to run the model
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    transformer=TRANSFORMER_PATH,
                                    transformer_instances=0, # set 0 instances to leverage scale-to-zero capabilities
                                    # optional arguments
                                    model_version=best_model.version, # set the version of the model to be deployed
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
2022-01-28 11:59:32,893 INFO: Serving mniste2ekftransformer successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mniste2ekftransformer
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
2022-01-28 11:59:36,844 INFO: Serving with name: mniste2ekftransformer successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(10)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.00716477772, 0.0495215505, 0.164436236, 0.0216756575, 0.532064497, 0.00794952642, 0.0278893802, 0.0100125261, 0.142405227, 0.0368806198]]}
{'predictions': [[0.00994806644, 0.0678183883, 0.183071345, 0.0769148469, 0.298602432, 0.0145412814, 0.0232542269, 0.0508962944, 0.199954733, 0.0749983266]]}
{'predictions': [[0.00678317, 0.042503044, 0.271116346, 0.0244265571, 0.424625099, 0.0157555342, 0.0164663401, 0.0211527571, 0.138280511, 0.0388906188]]}
{'predictions': [[0.00414211443, 0.0596462786, 0.30761528, 0.0209895894, 0.361549497, 0.0080305282, 0.0214627199, 0.0147881005, 0.147630453, 0.0541454516]]}
{'predictions': [[0.00497094495, 0.0577611215, 0.266127735, 0.0304065645, 0.381860167, 0.0151010472, 0.0085090138, 0.0440005139, 0.166975722, 0.0242872462]]}
{'predictions': [[0.0074628829, 0.0546550192, 0.365902811, 0.0349127688, 0.336112082, 0.0129383784, 0.0148731982, 0.0186928585, 0.115760811, 0.0386891328]]}
{'predictions': [[0.00787949283, 0.0604840368, 0.221175715, 0.0385403596, 0.30112344, 0.0180348102, 0.0290495325, 0.0352735482, 0.195198312, 0.093240656]]}
{'predictions': [[0.00616118405, 0.0766904354, 0.261844963, 0.0246380847, 0.177770182, 0.0271635056, 0.0157324187, 0.0496929437, 0.300589889, 0.0597163513]]}
{'predictions': [[0.00731297769, 0.0740799457, 0.158399507, 0.0463014878, 0.373041362, 0.0104803853, 0.0281247012, 0.0214988757, 0.194553837, 0.086206913]]}
{'predictions': [[0.0101395193, 0.0717072636, 0.188310683, 0.0325433388, 0.4157314, 0.00923475809, 0.0339613594, 0.0183466114, 0.188901231, 0.0311238244]]}

Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 35, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643371225, inferenceId:b442de40-9aab-41f5-8a6c-c4d772f75969, messageType:response
Predictions -> [[0.00873534381, 0.116867647, 0.129798576, 0.0748127252, 0.333430201, 0.0154738724, 0.0273870789, 0.0286567844, 0.192712128, 0.0721256658]]

INFO -> servingId: 35, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643371225, inferenceId:ead02abc-0279-418b-9b20-8d2ae3c7d67b, messageType:response
Predictions -> [[0.00662171189, 0.0716219693, 0.147491157, 0.0376859158, 0.398301035, 0.00908162631, 0.0212855339, 0.0298583135, 0.233680561, 0.0443721078]]

INFO -> servingId: 35, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643371225, inferenceId:1c27d104-dbc8-4d35-a9af-7a182116a34a, messageType:response
Predictions -> [[0.00517375534, 0.105344124, 0.197647452, 0.038458731, 0.244261652, 0.0108315526, 0.0158249736, 0.0569163188, 0.269892216, 0.0556492358]]

INFO -> servingId: 35, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643371225, inferenceId:e8f4db55-7d8d-45b9-8307-0a17cf910a6d, messageType:response
Predictions -> [[0.00795154274, 0.0799642131, 0.369177878, 0.0543756932, 0.286108434, 0.00967404712, 0.012239052, 0.034396667, 0.104219526, 0.0418928973]]

INFO -> servingId: 35, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643371226, inferenceId:a912e712-affb-4dff-9455-ae8f5198e94a, messageType:response
Predictions -> [[0.00681294175, 0.0445752069, 0.343478769, 0.0334866494, 0.379422158, 0.0143673914, 0.0213937927, 0.0121406969, 0.102342986, 0.0419792905]]