Model Serving with KFServing, Transformers and Tensorflow - MNIST Classification

Model Serving with KFServing, Transformers and Tensorflow - MNIST Classification


INPUT –> TRANSFORMER –> ENRICHED INPUT –> MODEL –> PREDICTION

This notebook requires KFServing

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/experiment/Tensorflow/mnist.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Check Model Repository for best model based on accuracy

Image7-Monitor.png

Query Model Repository for best mnist Model

from hops import model
from hops.model import Metric
MODEL_NAME="mnist"
EVALUATION_METRIC="accuracy"
best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX)
print('Model name: ' + best_model['name'])
print('Model version: ' + str(best_model['version']))
print(best_model['metrics'])
Model name: mnist
Model version: 1
{'accuracy': '0.75'}

Serve the Trained Model with a Transformer

To serve a model with a transformer, write a python script that implements the Transformer class and the methods preprocess and postprocess, like this:

class Transformer(object):
    def __init__(self):
        print("[Transformer] Initializing...")
        # Initialization code goes here

    def preprocess(self, inputs):
        # Transform the request inputs here. The object returned by this method will be used as model input.
        return inputs

    def postprocess(self, outputs):
        # Transform the predictions computed by the model before returning a response.
        return outputs
from hops import serving
from hops import hdfs
# Create serving instance
SERVING_NAME = MODEL_NAME
MODEL_PATH="/Models/" + best_model['name']
TRANSFORMER_PATH=hdfs.project_path() + "/Jupyter/serving/kfserving/tensorflow/transformer.py" # or .ipynb

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    MODEL_PATH, model_version=best_model['version'], # set the path and version of the model to be deployed
                                    kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
                                    transformer=TRANSFORMER_PATH, 
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    transformer_instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
Inferring model server from artifact files: TENSORFLOW_SERVING
Creating serving mnist for artifact /Projects/demo_ml_meb10000//Models/mnist ...
Serving mnist successfully created
# List all available servings in the project
for s in serving.get_all():
    print(s.name)
mnist
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'

Classify digits with the MNIST classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
Starting serving with name: mnist...
Serving with name: mnist successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(5)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(10):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.0613542125, 0.101323538, 0.0399091654, 0.0345890522, 0.302067667, 0.142577931, 0.0791310892, 0.0789082348, 0.0758777037, 0.0842615142]]}
{'predictions': [[0.0460102223, 0.0554421134, 0.0376880467, 0.0415558703, 0.214382008, 0.254266232, 0.062533088, 0.0978983194, 0.109497815, 0.0807262808]]}
{'predictions': [[0.0668974221, 0.0909847245, 0.0988900438, 0.0637232, 0.17731753, 0.122183181, 0.0818561539, 0.0969023556, 0.118541665, 0.0827036947]]}
{'predictions': [[0.0559709519, 0.0646299496, 0.0344434641, 0.0303664282, 0.306105494, 0.147630468, 0.0688919, 0.109457299, 0.0753086582, 0.107195415]]}
{'predictions': [[0.0380319357, 0.0628150403, 0.0754821, 0.026162535, 0.422493875, 0.0817232728, 0.0735279322, 0.0677653, 0.0746347308, 0.0773632899]]}
{'predictions': [[0.0665044934, 0.0668867528, 0.040783342, 0.0294370688, 0.338596195, 0.154240429, 0.0706104711, 0.0722219348, 0.107167564, 0.0535517447]]}
{'predictions': [[0.0825293362, 0.0559448414, 0.0419535786, 0.0181798562, 0.370016396, 0.154312477, 0.0558506288, 0.111585282, 0.0492350422, 0.0603924952]]}
{'predictions': [[0.0444687, 0.0443344265, 0.0746727288, 0.0247968901, 0.416514188, 0.0837049931, 0.102883548, 0.0640649423, 0.0908682868, 0.0536912456]]}
{'predictions': [[0.0763259754, 0.101309389, 0.0394318104, 0.0272058602, 0.320164829, 0.117243476, 0.0681534633, 0.0669988394, 0.11084564, 0.0723207369]]}
{'predictions': [[0.0394673645, 0.0884961, 0.0372174904, 0.0370060354, 0.335928798, 0.159753889, 0.087445952, 0.0784435645, 0.0646848083, 0.0715560839]]}

Monitor Prediction Requests and Responses using Kafka

All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 10):
    msg = consumer.poll(timeout=1)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)  
            payload = json.loads(event_dict["payload"])
            
            if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
                (event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
                continue
            
            print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
                  "requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
                       event_dict["servingId"],
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["inferenceId"],
                       event_dict["messageType"]))

            if event_dict['messageType'] == "request":
                print("Instances -> {}\n".format(payload['instances']))
                
            if event_dict['messageType'] == "response":
                print("Predictions -> {}\n".format(payload['predictions']))

        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
INFO -> servingId: 18, modelName: mnist, modelVersion: 1,requestTimestamp: 1623766542, inferenceId:741bdbc9-4846-469a-a2fc-2427e6a22679, messageType:response
Predictions -> [[0.0668974221, 0.0909847245, 0.0988900438, 0.0637232, 0.17731753, 0.122183181, 0.0818561539, 0.0969023556, 0.118541665, 0.0827036947]]

INFO -> servingId: 18, modelName: mnist, modelVersion: 1,requestTimestamp: 1623766542, inferenceId:c0fcae6d-1106-4922-aacc-5bc201d51393, messageType:response
Predictions -> [[0.0559709519, 0.0646299496, 0.0344434641, 0.0303664282, 0.306105494, 0.147630468, 0.0688919, 0.109457299, 0.0753086582, 0.107195415]]

INFO -> servingId: 18, modelName: mnist, modelVersion: 1,requestTimestamp: 1623766543, inferenceId:78412c97-5b28-47eb-bc7f-4689f044a5fc, messageType:response
Predictions -> [[0.0380319357, 0.0628150403, 0.0754821, 0.026162535, 0.422493875, 0.0817232728, 0.0735279322, 0.0677653, 0.0746347308, 0.0773632899]]

INFO -> servingId: 18, modelName: mnist, modelVersion: 1,requestTimestamp: 1623766543, inferenceId:dbb0163a-bd86-4edb-a6c9-24e607d0b7ee, messageType:response
Predictions -> [[0.0665044934, 0.0668867528, 0.040783342, 0.0294370688, 0.338596195, 0.154240429, 0.0706104711, 0.0722219348, 0.107167564, 0.0535517447]]

INFO -> servingId: 18, modelName: mnist, modelVersion: 1,requestTimestamp: 1623766543, inferenceId:8a586324-0b77-4d9c-9f1e-bb17a9cd4195, messageType:response
Predictions -> [[0.0825293362, 0.0559448414, 0.0419535786, 0.0181798562, 0.370016396, 0.154312477, 0.0558506288, 0.111585282, 0.0492350422, 0.0603924952]]