End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification
End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification
FEATURE ENGINEERING –> MODEL TRAINING –> MODEL SERVING –> MODEL MONITORING
In this notebook we will,
- Load the Iris Flower dataset from HopsFS
- Do feature engineering on the dataset
- Save the features to the feature store
- Read the feature data from the feature store
- Train a KNN Model using SkLearn
- Save the trained model to HopsFS
- Launch a serving instance to serve the trained model
- Send some prediction requests to the served model
- Monitor the predictions through Kafka
Model Serving on Hopsworks
The hops
python library
hops
is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.
Have a feature request or encountered an issue? Please let us know on github.
Using the experiment
module
To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.
The experiment
module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.
An Experiment could be a single Python program, which we refer to as an Experiment.
Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as Parallel Experiment.
ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as Distributed Training.
Using the hdfs
module
The hdfs
module provides a method to get the path in HopsFS where your data is stored, namely by calling hdfs.project_path()
. The path resolves to the root path for your project, which is the view that you see when you click Data Sets
in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be hdfs.project_path() + 'Resources/mnist'
# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project
from hops import hdfs
project_path = hdfs.project_path()
from hops import hdfs
# Uploading the traied model to hdfs
hdfs.copy_to_hdfs("iris_knn.pkl", "Resources/iris_flower", overwrite=True)
# Downloading the iris flower model to the current working directory
iris_flower_model_hdfs_path = hdfs.project_path() + "Resources/iris_flower/iris_knn.pkl"
local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)
Documentation
See the following links to learn more about running experiments in Hopsworks
- Learn more about experiments
- Building end-to-end pipelines
- Give us a star, create an issue or a feature request on Hopsworks github
Managing experiments
Experiments service provides a unified view of all the experiments run using the experiment
module.
As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.
Import libraries
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import joblib
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
import numpy as np
import time
import json
from hops import kafka, hdfs, serving, tls
from confluent_kafka import Producer, Consumer, KafkaError
import random
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
4 | application_1640029426455_0007 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
Prepare Training Dataset
Load Iris Dataset (csv)
project_path = hdfs.project_path()
iris_df = spark.read.format("csv").option("header", "true").option("inferSchema", True).load(
project_path + "TourData/iris/iris.csv")
iris_df.printSchema()
root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- variety: string (nullable = true)
Feature Engineering
The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the variety
column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation.
encoder = StringIndexer(inputCol="variety", outputCol="label")
fit_model = encoder.fit(iris_df)
iris_df1 = fit_model.transform(iris_df)
lookup_df = iris_df1.select(["variety", "label"]).distinct()
iris_df2 = iris_df1.drop("variety")
iris_df3 = iris_df2.withColumn("label", iris_df2["label"].cast(IntegerType()))
iris_df3.printSchema()
root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- label: integer (nullable = true)
iris_df3.show(5)
+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
| 5.1| 3.5| 1.4| 0.2| 0|
| 4.9| 3.0| 1.4| 0.2| 0|
| 4.7| 3.2| 1.3| 0.2| 0|
| 4.6| 3.1| 1.5| 0.2| 0|
| 5.0| 3.6| 1.4| 0.2| 0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows
lookup_df.show(3)
+----------+-----+
| variety|label|
+----------+-----+
| Virginica| 2.0|
|Versicolor| 1.0|
| Setosa| 0.0|
+----------+-----+
Save Features to the Feature Store
We can save two feature groups (hive tables), one called iris_features
that contains the iris features and the corresponding numeric label, and another feature group called iris_labels_lookup
for converting the numeric iris label back to categorical.
Note: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the “Settings” tab in your project, select the feature store service and click “Save”.
import hsfs
# Create a connection
connection = hsfs.connection()
# Get the feature store handle for the project's feature store
fs = connection.get_feature_store()
Connected. Call `.close()` to terminate connection gracefully.
iris_features = fs.create_feature_group(name="iris_features", version=1, time_travel_format=None)
iris_features.save(iris_df3)
iris_labels_lookup = fs.create_feature_group(name="iris_labels_lookup", version=1, time_travel_format=None)
iris_labels_lookup.save(lookup_df)
Read the Iris Training Dataset from the Feature Store
iris_features = fs.get_feature_group("iris_features", 1)
df = iris_features.read().toPandas()
df.head(10)
sepal_length sepal_width petal_length petal_width label
0 5.1 3.5 1.4 0.2 0
1 4.9 3.0 1.4 0.2 0
2 4.7 3.2 1.3 0.2 0
3 4.6 3.1 1.5 0.2 0
4 5.0 3.6 1.4 0.2 0
5 5.4 3.9 1.7 0.4 0
6 4.6 3.4 1.4 0.3 0
7 5.0 3.4 1.5 0.2 0
8 4.4 2.9 1.4 0.2 0
9 4.9 3.1 1.5 0.1 0
df.describe()
sepal_length sepal_width petal_length petal_width label
count 150.000000 150.000000 150.000000 150.000000 150.000000
mean 5.843333 3.057333 3.758000 1.199333 1.000000
std 0.828066 0.435866 1.765298 0.762238 0.819232
min 4.300000 2.000000 1.000000 0.100000 0.000000
25% 5.100000 2.800000 1.600000 0.300000 0.000000
50% 5.800000 3.000000 4.350000 1.300000 1.000000
75% 6.400000 3.300000 5.100000 1.800000 2.000000
max 7.900000 4.400000 6.900000 2.500000 2.000000
x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]
y_df = df[["label"]]
X = x_df.values
y = y_df.values.ravel()
Train an Iris Flower Classifier
IRIS_FLOWER_RESOURCES_PATH = "/Resources/iris_flower"
IRIS_FLOWER_MODEL_PKL = "iris_knn.pkl"
Generate a KNN Model using the Feature Data
neighbors = random.randint(3, 30)
iris_knn = KNeighborsClassifier(n_neighbors=neighbors)
iris_knn.fit(X, y)
y_pred = iris_knn.predict(X)
acc = accuracy_score(y, y_pred)
print(acc)
0.98
Save the Trained Model to HopsFS
joblib.dump(iris_knn, IRIS_FLOWER_MODEL_PKL)
hdfs.mkdir(IRIS_FLOWER_RESOURCES_PATH)
hdfs.copy_to_hdfs(IRIS_FLOWER_MODEL_PKL, IRIS_FLOWER_RESOURCES_PATH, overwrite=True)
Export the Trained Model to the Models Repository
It is a best-practice to put the python script that loads and run the model together with the trained model files. Below is the code for copying the script into the iris flower resources folder, which will be used when exporting the model.
IRIS_FLOWER_CLASSIFIER_SCRIPT = "iris_flower_classifier.py"
script_path = "Jupyter/end_to_end_pipeline/sklearn/" + IRIS_FLOWER_CLASSIFIER_SCRIPT
hdfs.cp(script_path, IRIS_FLOWER_RESOURCES_PATH + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT, overwrite=True)
MODEL_NAME = "irisflowerclassifier"
import hsml
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
input_schema = Schema(x_df)
output_schema = Schema(y_df)
conn = hsml.connection()
mr = conn.get_model_registry()
tf_model = mr.sklearn.create_model(MODEL_NAME,
metrics={'accuracy': acc},
input_example=x_df,
model_schema=ModelSchema(input_schema=input_schema, output_schema=output_schema))
tf_model.save(IRIS_FLOWER_RESOURCES_PATH)
Connected. Call `.close()` to terminate connection gracefully.
Exported model irisflowerclassifier with version 1
<hsml.sklearn.model.Model object at 0x7f3aa6ab5fd0>
Model export complete: 100%|##########| 6/6 [00:09<00:00, 1.60s/it]
for p in hdfs.ls("Models/" + MODEL_NAME, recursive=True):
print(p)
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/input_example.json
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_knn.pkl
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/model_schema.json
hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/program.ipynb
Serve the Trained Model
To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the Predict
class and the methods predict
, classify
and regress
, like this:
from sklearn.externals import joblib
from hops import hdfs
import os
class Predict(object):
def __init__(self):
""" Initializes the serving state, reads a trained model from HDFS"""
self.model_path = "Models/irisflowerclassifier/1/iris_knn.pkl"
print("Copying SKLearn model from HDFS to local directory")
hdfs.copy_to_local(self.model_path)
print("Reading local SkLearn model for serving")
self.model = joblib.load("./iris_knn.pkl")
print("Initialization Complete")
def predict(self, inputs):
""" Serves a prediction request usign a trained model"""
return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable
def classify(self, inputs):
""" Serves a classification request using a trained model"""
return "not implemented"
def regress(self, inputs):
""" Serves a regression request using a trained model"""
return "not implemented"
Then upload this python script to some folder in your project and go to the “Model Serving” service in Hopsworks:
Then click on “create serving” and configure your serving:
Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.
Query Model Repository for best IrisFlowerClassifier Model
EVALUATION_METRIC="accuracy"
best_model = mr.get_best_model(MODEL_NAME, EVALUATION_METRIC, "max")
print('Model name: ' + best_model.name)
print('Model version: ' + str(best_model.version))
print(best_model.training_metrics)
Model name: irisflowerclassifier
Model version: 1
{'accuracy': '0.98'}
Once all the files have been exported to the model directory, we can create a serving instance that points to the model files using serving.create_or_update()
# Create serving
SERVING_NAME = MODEL_NAME
SCRIPT_PATH = best_model.version_path + "/" + IRIS_FLOWER_CLASSIFIER_SCRIPT
serving.create_or_update(SERVING_NAME, # define a name for the serving instance
model_path=best_model.model_path, # set the path of the model to be deployed
model_server="PYTHON", # set the model server to run the model
predictor=SCRIPT_PATH, # set the predictor to load the model and make predictions
# optional arguments
model_version=best_model.version, # set the version of the model to be deployed
kfserving=False, # the model will be served either with Docker or Kubernetes depending on the Hopsworks version
topic_name="CREATE", # topic name or CREATE to create a new topic for inference logging, otherwise NONE
instances=1 # number of replicas
)
for s in serving.get_all():
print(s.name)
irisflowerclassifier
After the serving have been created, you can find it in the Hopsworks UI by going to the “Model Serving” tab. You can also use the python module to query the Hopsworks REST API about information on the existing servings using methods like:
get_servings()
get_serving_id(serving_name)
get_serving_model_path(serving_name)
get_serving_type(serving_name)
get_serving_version(serving_name)
get_serving_kafka_topic(serving_name)
get_serving_status(serving_name)
exist(serving_name)
print("Info: \tid: {},\n \
model_path: {},\n \
model_version: {},\n \
artifact_version: {},\n \
model_server: {},\n \
serving_tool: {}".format(
serving.get_id(SERVING_NAME),
serving.get_model_path(SERVING_NAME),
serving.get_model_version(SERVING_NAME),
serving.get_artifact_version(SERVING_NAME),
serving.get_model_server(SERVING_NAME),
serving.get_serving_tool(SERVING_NAME)))
Info: id: 1,
model_path: /Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py,
model_version: 1,
artifact_version: 0,
model_server: FLASK,
serving_tool: DEFAULT
Or describe a serving using
- describe(serving_name)
serving.describe(SERVING_NAME)
Classify Iris Flowers using the Trained Model
You can start/stop the serving instance either from the Hopsworks UI or from the python/REST API as demonstrated below
Shut down currently running serving
if serving.get_status(SERVING_NAME) == "Running":
serving.stop(SERVING_NAME)
time.sleep(10)
Start new serving
serving.start(SERVING_NAME)
Wait until serving is up and running
while serving.get_status(SERVING_NAME) != "Running":
time.sleep(5) # Let the serving startup correctly
time.sleep(15)
Send Prediction Requests to the Served Model using Hopsworks REST API
For making inference requests you can use the utility method serving.make_inference_request
NUM_FEATURES = 4
for i in range(20):
data = {"inputs" : [[random.uniform(1, 8) for i in range(NUM_FEATURES)]]}
response = serving.make_inference_request(SERVING_NAME, data)
print(response)
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [0]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [1]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
{'predictions': [2]}
Monitor Prediction Logs
Consume Prediction Requests and Responses using Kafka
All prediction requestst are automatically logged to Kafka which means that you can keep track for your model’s performance and its predictions in a scalable manner.
Setup a Kafka consumer and subscribe to the topic containing the prediction logs
TOPIC_NAME = serving.get_kafka_topic(SERVING_NAME)
config = kafka.get_kafka_default_config()
config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
consumer = Consumer(config)
topics = [TOPIC_NAME]
consumer.subscribe(topics)
Read the Kafka Avro schema from Hopsworks and setup an Avro reader
json_schema = kafka.get_schema(TOPIC_NAME)
avro_schema = kafka.convert_json_schema_to_avro(json_schema)
Read the lookup table from the Feature Store for converting numerical labels to categorical
iris_labels_lookup = fs.get_feature_group("iris_labels_lookup", 1)
iris_labels_lookup_df = iris_labels_lookup.read().toPandas()
Read messages from the Kafka topic, parse them with the Avro schema and print the results
PRINT_INSTANCES=False
PRINT_PREDICTIONS=True
for i in range(0, 5):
msg = consumer.poll(timeout=5.0)
if msg is not None:
value = msg.value()
try:
event_dict = kafka.parse_avro_msg(value, avro_schema)
print("serving: {}, version: {}, timestamp: {},\n"\
" http_response_code: {}, model_server: {}, serving_tool: {}".format(
event_dict["modelName"],
event_dict["modelVersion"],
event_dict["requestTimestamp"],
event_dict["responseHttpCode"],
event_dict["modelServer"],
event_dict["servingTool"]))
if PRINT_INSTANCES:
print("instances: {}\n".format(event_dict["inferenceRequest"]))
if PRINT_PREDICTIONS:
prediction = json.loads(event_dict["inferenceResponse"])["predictions"][0]
prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 'variety'].iloc[0]
print("predictions: {}, prediction_label:{}\n".format(prediction, prediction_label))
except Exception as e:
print("A message was read but there was an error parsing it")
print(e)
else:
print("timeout.. no more messages to read from topic")
timeout.. no more messages to read from topic
serving: irisflowerclassifier, version: 1, timestamp: 1640099468723,
http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica
serving: irisflowerclassifier, version: 1, timestamp: 1640099468803,
http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica
serving: irisflowerclassifier, version: 1, timestamp: 1640099468901,
http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica
serving: irisflowerclassifier, version: 1, timestamp: 1640099468994,
http_response_code: 200, model_server: FLASK, serving_tool: DEFAULT
predictions: 2, prediction_label:Virginica