Model Serving with Docker/Kubernetes and Tensorflow - MNIST Classification

Model Serving with Docker/Kubernetes and Tensorflow - MNIST Classification


INPUT –> MODEL –> PREDICTION

NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in Jupyter/end_to_end_pipelines/tensorflow/end_to_end_tensorflow.ipynb

Model Serving on Hopsworks

hops.png

The hops python library

hops is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.

Have a feature request or encountered an issue? Please let us know on github.

Serve the MNIST classifier

Query Model Registry for best Mnist Model

import hsml

conn = hsml.connection()
mr = conn.get_model_registry()
Connected. Call `.close()` to terminate connection gracefully.
MODEL_NAME="mnist_e2e"
EVALUATION_METRIC="accuracy"

best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")

print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist_e2e
Model version: 1
{'accuracy': '0.625'}

Create Model Serving of Exported Model

from hops import serving
# Create serving instance
SERVING_NAME = "mniste2e"

response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
                                    model_path=best_model.model_path, # set the path of the model to be deployed
                                    model_server="TENSORFLOW_SERVING", # set the model server to run the model
                                    # optional arguments
                                    model_version=best_model.version, # set the version of the model to be deployed
                                    kfserving=False, # the model will be served either with Docker or Kubernetes depending on the Hopsworks version
                                    topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
                                    instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
                                    )
2022-01-28 11:45:02,168 INFO: Serving mniste2e successfully created

Once the serving instance is created, it will be shown in the “Model Serving” tab in the Hopsworks UI. You can view detailed information like server-logs and which Kafka Topic it is logging inference requests to.

You can also use the Python module to query the Hopsworks REST API about information on the existing servings using methods like:

  • get_all()
  • get_id(serving_name)
  • get_model_path(serving_name)
  • get_model_version(serving_name)
  • get_artifact_version(serving_name)
  • get_kafka_topic(serving_name)
  • ...
print("Info: \tid: {},\n \
       model_path: {},\n \
       model_version: {},\n \
       artifact_version: {},\n \
       model_server: {},\n \
       serving_tool: {}".format(
    serving.get_id(SERVING_NAME),
    serving.get_model_path(SERVING_NAME),
    serving.get_model_version(SERVING_NAME),
    serving.get_artifact_version(SERVING_NAME),
    serving.get_model_server(SERVING_NAME),
    serving.get_serving_tool(SERVING_NAME)))
Info:   id: 31,
        model_path: /Projects/demo_ml_meb10000/Models/mnist_e2e,
        model_version: 1,
        artifact_version: 0,
        model_server: TENSORFLOW_SERVING,
        serving_tool: DEFAULT
for s in serving.get_all():
    print(s.name)
mniste2e

Classify Digits with the MNIST Classifier

Start Model Serving Server

if serving.get_status(SERVING_NAME) == 'Stopped':
    serving.start(SERVING_NAME)
2022-01-28 11:45:03,776 INFO: Serving with name: mniste2e successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
    time.sleep(5) # Let the serving startup correctly
time.sleep(10)

Check Model Serving for active servings

Image7-Monitor.png

Send Prediction Requests to the Served Model using Hopsworks REST API

import json
import numpy as np

NUM_FEATURES=784

for i in range(20):
    data = {
                "signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
            }
    response = serving.make_inference_request(SERVING_NAME, data)
    print(response)
{'predictions': [[0.00572621496, 0.108208649, 0.27227217, 0.0510373078, 0.299194, 0.0186527651, 0.0198455323, 0.0278544612, 0.152300492, 0.0449083634]]}
{'predictions': [[0.0046633645, 0.0414372683, 0.291441888, 0.0312344842, 0.444997281, 0.0112954751, 0.014659116, 0.0170055088, 0.113880813, 0.0293848235]]}
{'predictions': [[0.0076998882, 0.0302102603, 0.235633671, 0.0153572187, 0.287438601, 0.0136197172, 0.0237444546, 0.0156203927, 0.322000772, 0.0486749858]]}
{'predictions': [[0.0108826989, 0.0823879838, 0.210901648, 0.0343613736, 0.331943542, 0.0128106838, 0.0406103022, 0.0279905479, 0.179413438, 0.0686978]]}
{'predictions': [[0.0109353121, 0.0673055351, 0.39981252, 0.0546917692, 0.294156522, 0.0124330763, 0.0245132856, 0.0148018971, 0.0674069598, 0.0539430939]]}
{'predictions': [[0.0114273727, 0.0790175945, 0.13507475, 0.0827436745, 0.370159179, 0.00988207199, 0.0314267166, 0.0309457481, 0.195226014, 0.0540968478]]}
{'predictions': [[0.0133531187, 0.0599063262, 0.247828186, 0.0464959219, 0.427820921, 0.0191995669, 0.0206612721, 0.0281933304, 0.0847786739, 0.0517626703]]}
{'predictions': [[0.00723468931, 0.111925527, 0.263983548, 0.0296120755, 0.357140779, 0.0177987386, 0.0169458222, 0.0362103, 0.104185537, 0.0549630672]]}
{'predictions': [[0.00825782213, 0.0608470626, 0.249044895, 0.0373915546, 0.354562044, 0.0104568275, 0.0274981633, 0.0231450014, 0.158275828, 0.0705208555]]}
{'predictions': [[0.0113548581, 0.0370724536, 0.236185282, 0.0362716, 0.245105356, 0.0161779951, 0.0189345255, 0.0334743597, 0.306493491, 0.0589301251]]}
{'predictions': [[0.00681277085, 0.11164733, 0.289175451, 0.0336382575, 0.261108965, 0.0185933951, 0.0239503831, 0.0405027345, 0.17715925, 0.0374115221]]}
{'predictions': [[0.0111160725, 0.0805897564, 0.239300177, 0.0385839716, 0.3445, 0.0188862346, 0.0326525047, 0.0327585265, 0.153218359, 0.0483945422]]}
{'predictions': [[0.00481388066, 0.0545748733, 0.238435447, 0.034287069, 0.353871942, 0.0114829037, 0.0271637905, 0.0178126022, 0.156495899, 0.101061471]]}
{'predictions': [[0.00633331668, 0.0470374823, 0.339952618, 0.0310638566, 0.306065679, 0.013985374, 0.0280421339, 0.0124483872, 0.154721156, 0.0603500269]]}
{'predictions': [[0.0117167076, 0.0657213852, 0.251739472, 0.0289249122, 0.410816878, 0.011261181, 0.040211603, 0.011705935, 0.13377139, 0.0341305584]]}
{'predictions': [[0.00604272867, 0.0362226181, 0.258314073, 0.0260963496, 0.358961761, 0.00796514843, 0.0122928983, 0.0202498324, 0.238217086, 0.0356375128]]}
{'predictions': [[0.00509981066, 0.0535564, 0.321242362, 0.0374574, 0.262698799, 0.0172182638, 0.0105276722, 0.0232814737, 0.194753274, 0.0741645545]]}
{'predictions': [[0.00960638188, 0.108674034, 0.268778, 0.0699506328, 0.185300842, 0.0157204196, 0.0365220755, 0.0281580407, 0.212751329, 0.0645382553]]}
{'predictions': [[0.00637084339, 0.0884655416, 0.20060347, 0.04854545, 0.203479424, 0.0286499448, 0.0381827056, 0.0213748738, 0.325430036, 0.0388977155]]}
{'predictions': [[0.00527361361, 0.0295326672, 0.279165745, 0.0256425422, 0.448510081, 0.0165301226, 0.0195797198, 0.0204584897, 0.122249223, 0.0330577828]]}

Monitor Prediction Requests and Responses using Kafka

from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError

Setup Kafka consumer and subscribe to the topic containing the prediction logs

TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)

config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)

Read the Kafka Avro schema from Hopsworks and setup an Avro reader

json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)

Read messages from the Kafka topic, parse them with the Avro schema and print the results

PRINT_INSTANCES=False
PRINT_PREDICTIONS=True

for i in range(0, 5):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        value = msg.value()
        try:
            event_dict = kafka.parse_avro_msg(value, avro_schema)
            
            print("serving: {}, version: {}, timestamp: {},\n"\
                  "        http_response_code: {}, model_server: {}, serving_tool: {}".format(
                       event_dict["modelName"],
                       event_dict["modelVersion"],
                       event_dict["requestTimestamp"],
                       event_dict["responseHttpCode"],
                       event_dict["modelServer"],
                       event_dict["servingTool"]))
            
            if PRINT_INSTANCES:
                print("instances: {}\n".format(event_dict["inferenceRequest"]))
            if PRINT_PREDICTIONS:
                print("predictions: {}\n".format(json.loads(event_dict["inferenceResponse"])["predictions"][0]))
                      
        except Exception as e:
            print("A message was read but there was an error parsing it")
            print(e)
    else:
        print("timeout.. no more messages to read from topic")
serving: mniste2e, version: 1, timestamp: 1643370334068,
        http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.00531054195, 0.0404060595, 0.290843785, 0.0300603732, 0.406977743, 0.012764995, 0.0235719737, 0.0228966139, 0.123488344, 0.0436795913]

serving: mniste2e, version: 1, timestamp: 1643370334060,
        http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.00717737712, 0.0555244572, 0.316040367, 0.0621430837, 0.293955833, 0.0159867015, 0.0470166355, 0.0125214364, 0.149252892, 0.0403812565]

serving: mniste2e, version: 1, timestamp: 1643370334059,
        http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.00496881455, 0.0392289087, 0.340284616, 0.0283909775, 0.31322825, 0.0148269441, 0.0102977483, 0.0242117718, 0.158610448, 0.0659515]

serving: mniste2e, version: 1, timestamp: 1643370334060,
        http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.011219847, 0.0522725135, 0.23986721, 0.0267774705, 0.435368598, 0.00980452728, 0.0200000331, 0.022984853, 0.139044553, 0.0426604077]

serving: mniste2e, version: 1, timestamp: 1643370334411,
        http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.00758659374, 0.076115191, 0.349409163, 0.0237191711, 0.286886334, 0.0122001776, 0.00834187213, 0.029796049, 0.148333102, 0.0576123595]