Model Serving with KFServing and Tensorflow - MNIST Classification
Model Serving with KFServing and Tensorflow - MNIST Classification
INPUT –> MODEL –> PREDICTION
This notebook requires KFServing to be installed
NOTE: It is assumed that a model called mnist is already available in Hopsworks. An example of training a model for the MNIST handwritten digit classification problem is available in
Jupyter/experiment/Tensorflow/mnist.ipynb
Model Serving on Hopsworks
The hops
python library
hops
is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.
Have a feature request or encountered an issue? Please let us know on github.
Serve the MNIST classifier
Check Model Repository for best model based on accuracy
Query Model Repository for best mnist Model
import hsml
conn = hsml.connection()
mr = conn.get_model_registry()
MODEL_NAME="mnist_e2e"
EVALUATION_METRIC="accuracy"
Connected. Call `.close()` to terminate connection gracefully.
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist_e2e
Model version: 1
{'accuracy': '0.625'}
Create Model Serving of Exported Model
from hops import serving
# Create serving instance
SERVING_NAME = "mniste2ekf"
response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
model_path=best_model.model_path, # set the path of the model to be deployed
model_server="TENSORFLOW_SERVING", # set the model server to run the model
kfserving=True, # whether to serve the model using KFServing or the default tool in the current Hopsworks version
# optional arguments
model_version=best_model.version, # set the version of the model to be deployed
topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
inference_logging="ALL", # with KFServing, select the type of inference data to log into Kafka, e.g MODEL_INPUTS, PREDICTIONS or ALL
instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
)
2022-01-28 11:52:48,685 INFO: Serving mniste2ekf successfully created
# List all available servings in the project
for s in serving.get_all():
print(s.name)
mniste2ekf
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'
Classify digits with the MNIST classifier
Start Model Serving Server
if serving.get_status(SERVING_NAME) == 'Stopped':
serving.start(SERVING_NAME)
2022-01-28 11:52:49,378 INFO: Serving with name: mniste2ekf successfully started
import time
while serving.get_status(SERVING_NAME) != "Running":
time.sleep(5) # Let the serving startup correctly
time.sleep(10)
Check Model Serving for active servings
Send Prediction Requests to the Served Model using Hopsworks REST API
import json
import numpy as np
NUM_FEATURES=784
for i in range(10):
data = {
"signature_name": "serving_default", "instances": [np.random.rand(NUM_FEATURES).tolist()]
}
response = serving.make_inference_request(SERVING_NAME, data)
print(response)
{'predictions': [[0.00228863931, 0.039052885, 0.299541056, 0.0292527322, 0.210720837, 0.00816287566, 0.0126433428, 0.0109003522, 0.344060391, 0.0433768257]]}
{'predictions': [[0.00586040691, 0.0307841301, 0.33769837, 0.0337129459, 0.254304558, 0.0237869881, 0.0141318291, 0.0193788894, 0.238279134, 0.0420627445]]}
{'predictions': [[0.00538377091, 0.061361514, 0.238303721, 0.0285069272, 0.501042247, 0.0104929302, 0.0204542708, 0.0179179348, 0.0851745605, 0.0313620083]]}
{'predictions': [[0.00573793799, 0.029261997, 0.369869381, 0.0178442206, 0.450838357, 0.00611973461, 0.00844837539, 0.0112320511, 0.079323791, 0.0213241577]]}
{'predictions': [[0.0119106388, 0.0629702583, 0.153047368, 0.0336705893, 0.3950378, 0.00748524442, 0.0267426614, 0.0312577672, 0.249878481, 0.0279991794]]}
{'predictions': [[0.0111142406, 0.0491779149, 0.204911619, 0.0337088816, 0.241256326, 0.0222033393, 0.0126153091, 0.0414450057, 0.302034378, 0.0815329328]]}
{'predictions': [[0.00563503848, 0.0279151835, 0.28025192, 0.0385764167, 0.363024831, 0.0126688043, 0.0207744408, 0.0177768413, 0.193134412, 0.0402421467]]}
{'predictions': [[0.00543945516, 0.126275226, 0.163412824, 0.04192359, 0.265003413, 0.0151998475, 0.0343197025, 0.0221705418, 0.241572261, 0.0846831948]]}
{'predictions': [[0.00477044843, 0.0468725786, 0.208328143, 0.0335769281, 0.347728401, 0.0162629858, 0.0180092286, 0.01045975, 0.283724517, 0.0302670337]]}
{'predictions': [[0.00746631622, 0.0523283072, 0.304017246, 0.027453661, 0.280292451, 0.0179669447, 0.0197735652, 0.0237298273, 0.174194977, 0.0927767605]]}
Monitor Prediction Requests and Responses using Kafka
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError
Setup Kafka consumer and subscribe to the topic containing the prediction logs
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)
Read the Kafka Avro schema from Hopsworks and setup an Avro reader
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)
Read messages from the Kafka topic, parse them with the Avro schema and print the results
PRINT_INSTANCES=False
PRINT_PREDICTIONS=True
for i in range(0, 10):
msg = consumer.poll(timeout=5.0)
if msg is not None:
value = msg.value()
try:
event_dict = kafka.parse_avro_msg(value, avro_schema)
payload = json.loads(event_dict["payload"])
if (event_dict['messageType'] == "request" and not PRINT_INSTANCES) or \
(event_dict['messageType'] == "response" and not PRINT_PREDICTIONS):
continue
print("INFO -> servingId: {}, modelName: {}, modelVersion: {},"\
"requestTimestamp: {}, inferenceId:{}, messageType:{}".format(
event_dict["servingId"],
event_dict["modelName"],
event_dict["modelVersion"],
event_dict["requestTimestamp"],
event_dict["inferenceId"],
event_dict["messageType"]))
if event_dict['messageType'] == "request":
print("Instances -> {}\n".format(payload['instances']))
if event_dict['messageType'] == "response":
print("Predictions -> {}\n".format(payload['predictions']))
except Exception as e:
print("A message was read but there was an error parsing it")
print(e)
else:
print("timeout.. no more messages to read from topic")
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:8fe0ab82-a9fd-424e-b154-b88eaeb82312, messageType:response
Predictions -> [[0.00531423138, 0.0495751873, 0.215446427, 0.0254741, 0.456469536, 0.00679346081, 0.0227076765, 0.0156853292, 0.151415035, 0.0511190034]]
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:2eb8f1b7-5fe5-4a15-921e-1077b3bc5548, messageType:response
Predictions -> [[0.015544543, 0.04791224, 0.231773868, 0.0455554053, 0.352704018, 0.0196769126, 0.023360543, 0.0201557297, 0.190379784, 0.0529369563]]
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:2a85e0ed-70a4-43b3-8afd-43fdb5bb888c, messageType:response
Predictions -> [[0.0119756339, 0.108432077, 0.249693975, 0.0291424617, 0.297212154, 0.0278416723, 0.0237392467, 0.0305440109, 0.182935417, 0.038483344]]
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370799, inferenceId:92f62e3a-f05e-442f-91c2-7f8de7d80de9, messageType:response
Predictions -> [[0.0061565917, 0.0573133416, 0.157412216, 0.0603140071, 0.319210559, 0.01605873, 0.018614579, 0.0371795483, 0.204262689, 0.123477831]]
INFO -> servingId: 33, modelName: mnist_e2e, modelVersion: 1,requestTimestamp: 1643370800, inferenceId:4c50013c-834d-4704-9808-6a6cb25dbb95, messageType:response
Predictions -> [[0.011226072, 0.0487205796, 0.279118955, 0.0241690688, 0.347856402, 0.0182868, 0.0276911054, 0.0257906299, 0.145419195, 0.0717211664]]