End-To-End Example with Tensorflow Keras - MNIST Classification
End-To-End Example with Tensorflow Keras and Hops - MNIST Classification
DATA PREPARATION –> MODEL TRAINING –> MODEL SERVING –> MODEL MONITORING
Tested with TensorFlow 2.4.0
Model Serving on Hopsworks
The hops
python library
hops
is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.
Have a feature request or encountered an issue? Please let us know on github.
Using the experiment
module
To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.
The experiment
module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.
An Experiment could be a single Python program, which we refer to as an Experiment.
Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as Parallel Experiment.
ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as Distributed Training.
Using the tensorboard
module
The tensorboard
module allow us to get the log directory for summaries and checkpoints to be written to the TensorBoard we will see in a bit. The only function that we currently need to call is tensorboard.logdir()
, which returns the path to the TensorBoard log directory. Furthermore, the content of this directory will be put in as a Dataset in your project’s Experiments folder.
The directory could in practice be used to store other data that should be accessible after the experiment is finished.
# Use this module to get the TensorBoard logdir
from hops import tensorboard
tensorboard_logdir = tensorboard.logdir()
Using the hdfs
module
The hdfs
module provides a method to get the path in HopsFS where your data is stored, namely by calling hdfs.project_path()
. The path resolves to the root path for your project, which is the view that you see when you click Data Sets
in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be hdfs.project_path() + 'Resources/mnist'
# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
# Downloading the mnist dataset to the current working directory
from hops import hdfs
mnist_hdfs_path = hdfs.project_path() + "Resources/mnist"
local_mnist_path = hdfs.copy_to_local(mnist_hdfs_path)
Documentation
See the following links to learn more about running experiments in Hopsworks
- Learn more about experiments
- Building End-To-End pipelines
- Give us a star, create an issue or a feature request on Hopsworks github
Managing experiments
Experiments service provides a unified view of all the experiments run using the experiment
module.
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.
Train a Mnist Classifier running a Experiment
def keras_mnist():
import os
import sys
import uuid
import random
import numpy as np
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras import backend as K
import math
from hops import tensorboard
import hsml
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
from hops import hdfs
import pydoop.hdfs as pydoop
batch_size=32
num_classes = 10
# Provide path to train and validation datasets
train_filenames = [hdfs.project_path() + "TourData/mnist/train/train.tfrecords"]
validation_filenames = [hdfs.project_path() + "TourData/mnist/validation/validation.tfrecords"]
# Define input function
def data_input(filenames, batch_size=128, num_classes = 10, shuffle=False, repeat=None):
def parser(serialized_example):
"""Parses a single tf.Example into image and label tensors."""
features = tf.io.parse_single_example(
serialized_example,
features={
'image_raw': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
})
image = tf.io.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([28 * 28])
# Normalize the values of the image from the range [0, 255] to [-0.5, 0.5]
image = tf.cast(image, tf.float32) / 255 - 0.5
label = tf.cast(features['label'], tf.int32)
# Create a one hot array for your labels
label = tf.one_hot(label, num_classes)
return image, label
# Import MNIST data
dataset = tf.data.TFRecordDataset(filenames)
# Map the parser over dataset, and batch results by up to batch_size
dataset = dataset.map(parser)
if shuffle:
dataset = dataset.shuffle(buffer_size=128)
dataset = dataset.batch(batch_size, drop_remainder=True)
dataset = dataset.repeat(repeat)
return dataset
# Define a Keras Model.
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(tf.keras.layers.Dense(num_classes, activation='softmax'))
# Compile the model.
model.compile(loss=tf.keras.losses.categorical_crossentropy,
optimizer= tf.keras.optimizers.Adam(0.001),
metrics=['accuracy']
)
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=tensorboard.logdir()),
tf.keras.callbacks.ModelCheckpoint(filepath=tensorboard.logdir()),
]
model.fit(data_input(train_filenames, batch_size),
verbose=0,
epochs=3,
steps_per_epoch=5,
validation_data=data_input(validation_filenames, batch_size),
validation_steps=1,
callbacks=callbacks
)
score = model.evaluate(data_input(validation_filenames, batch_size), steps=1)
# Export model
# WARNING(break-tutorial-inline-code): The following code snippet is
# in-lined in tutorials, please update tutorial documents accordingly
# whenever code changes.
export_path = os.getcwd() + '/model-' + str(uuid.uuid4())
print('Saving trained model to: {}'.format(export_path))
tf.saved_model.save(model, export_path)
print('Done saving!')
metrics = {'accuracy': score[1]}
# export trained model
conn = hsml.connection()
mr = conn.get_model_registry()
input_example = np.random.randint(0, high=256, size=784, dtype=np.uint8)
input_schema = Schema(input_example)
output_schema = Schema([{'type': 'float32', 'shape': [10], 'description': 'Predictions per image category'}])
tf_model = mr.tensorflow.create_model("mnist_e2e",
metrics=metrics,
input_example=input_example,
model_schema=ModelSchema(input_schema=input_schema, output_schema=output_schema))
tf_model.save(export_path)
return metrics
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
5 | application_1639488860019_0008 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
from hops import experiment
from hops import hdfs
experiment.launch(keras_mnist, name='mnist model for e2e example', local_logdir=True, metric_key='accuracy')
Finished Experiment
('hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Experiments/application_1639488860019_0008_1', {'accuracy': 0.75, 'log': 'Experiments/application_1639488860019_0008_1/output.log'})
Serve the Trained Model
Check Model Repository for best Mnist model based on accuracy
Query Model Repository for best Mnist model
MODEL_NAME="mnist_e2e"
EVALUATION_METRIC="accuracy"
import hsml
conn = hsml.connection()
mr = conn.get_model_registry()
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
Connected. Call `.close()` to terminate connection gracefully.
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: mnist_e2e
Model version: 1
{'accuracy': '0.75'}
Create a Model Serving endpoint for the exported model
from hops import serving
# Create serving instance
SERVING_NAME = "mnist"
response = serving.create_or_update(SERVING_NAME, # define a name for the serving instance
model_path=best_model.model_path, # set the path of the model to be deployed
model_server="TENSORFLOW_SERVING", # set the model server to run the model
# optional arguments
model_version=best_model.version, # set the version of the model to be deployed
kfserving=False, # the model will be served either with Docker or Kubernetes depending on the Hopsworks version
topic_name="CREATE", # (optional) set the topic name or CREATE to create a new topic for inference logging
instances=1, # with KFServing, set 0 instances to leverage scale-to-zero capabilities
)
# List all available servings in the project
for s in serving.get_all():
print(s.name)
irisflowerclassifier
mnist
# Get serving status
serving.get_status(SERVING_NAME)
'Stopped'
Check Model Serving for active servings
Classify Mnist samples using the Trained Model
Start Model Serving server
if serving.get_status(SERVING_NAME) == 'Stopped':
serving.start(SERVING_NAME)
import time
while serving.get_status(SERVING_NAME) != "Running":
time.sleep(5) # Let the serving startup correctly
time.sleep(10)
Send prediction requests to the Mnist model using Hopsworks REST API
import numpy as np
NUM_FEATURES=784
import json
for i in range(10):
data = {
"signature_name": "serving_default", "instances": [np.random.randint(0, high=256, size=NUM_FEATURES, dtype=np.uint8).tolist()]
}
response = serving.make_inference_request(SERVING_NAME, data)
print(response)
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
{'predictions': [[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]}
Monitor Prediction Logs
Consume Prediction Requests and Responses using Kafka
All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.
from hops import kafka
from confluent_kafka import Producer, Consumer, KafkaError
Setup a Kafka consumer and subscribe to the topic containing the prediction logs
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)
Read the Kafka Avro schema from Hopsworks and setup an Avro reader
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)
Read messages from the Kafka topic, parse them with the Avro schema and print the results
PRINT_INSTANCES=False
PRINT_PREDICTIONS=True
for i in range(0, 10):
msg = consumer.poll(timeout=5.0)
if msg is not None:
value = msg.value()
try:
event_dict = kafka.parse_avro_msg(value, avro_schema)
print("serving: {}, version: {}, timestamp: {},\n"\
" http_response_code: {}, model_server: {}, serving_tool: {}".format(
event_dict["modelName"],
event_dict["modelVersion"],
event_dict["requestTimestamp"],
event_dict["responseHttpCode"],
event_dict["modelServer"],
event_dict["servingTool"]))
if PRINT_INSTANCES:
print("instances: {}\n".format(event_dict["inferenceRequest"]))
if PRINT_PREDICTIONS:
print("predictions: {}\n".format(json.loads(event_dict["inferenceResponse"])["predictions"][0]))
except Exception as e:
print("A message was read but there was an error parsing it")
print(e)
else:
print("timeout.. no more messages to read from topic")
timeout.. no more messages to read from topic
serving: mnist, version: 1, timestamp: 1639748317833,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748317955,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318115,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318202,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318322,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318439,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318584,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318668,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
serving: mnist, version: 1, timestamp: 1639748318787,
http_response_code: 200, model_server: TENSORFLOW_SERVING, serving_tool: DEFAULT
predictions: [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]