Inference (Batch) in PySpark Example

Large Scale Batch Inference on HopsFS

To run this notebook you must first install the following libraries in your project’s conda environment (in addition to the base libraries):

  • Pillow
  • Matplotlib

Moreover, the notebook assumes that you have access to the ImageNet dataset, this can either be uploaded to your project or shared from another project.

You also need access to an internet connection so that the pre-trained model can be downloaded.

Imports

import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from hops import experiment
from hops import tensorboard
from hops import featurestore
from hops import hdfs
from hops import util
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from PIL import Image
import numpy as np
from tensorflow.keras.models import load_model
import tensorflow.keras.models
import types
import tempfile
from pyspark.sql import DataFrame, Row
import pydoop.hdfs as pydoop
%%local
import matplotlib.pyplot as plt
import tensorflow as tf
import pydoop.hdfs as pydoop
from hops import hdfs

Constants

HEIGHT =224
WIDTH = 224
BATCH_SIZE = 100
CHANNELS = 3
INPUT_SHAPE = 12288
NUM_CLASSES = 1000
NUM_PARALLEL_CALLS = 8
SAMPLE_IMAGE_DIR = pydoop.path.abspath(hdfs.project_path("labs") + "/imagenet_2016/ILSVRC2016_CLS-LOC/ILSVRC/Data/CLS-LOC/train/n03617480/")
SAMPLE_IMAGE_NAME = "n03617480_28686.JPEG"
SAMPLE_IMAGE_PATH = SAMPLE_IMAGE_DIR + SAMPLE_IMAGE_NAME
MODEL_NAME = "resnet_imagenet.h5"
%%local
HEIGHT =224
WIDTH = 224
CHANNELS = 3
INPUT_SHAPE = 12288
NUM_CLASSES = 1000
BATCH_SIZE = 100
NUM_PARALLEL_CALLS = 8
SAMPLE_IMAGE_DIR = pydoop.path.abspath(hdfs.project_path("labs") + "/imagenet_2016/ILSVRC2016_CLS-LOC/ILSVRC/Data/CLS-LOC/train/n03617480/")
SAMPLE_IMAGE_NAME = "n03617480_28686.JPEG"
SAMPLE_IMAGE_PATH = SAMPLE_IMAGE_DIR + SAMPLE_IMAGE_NAME
MODEL_NAME = "resnet_imagenet.h5"

Load Pre-Trained ResNet50 Model Trained on ImageNet from Keras.applications

def define_model():
    """
    Defines the model to use for image classification
    
    Returns:
           ResNet50 model
    """
    tf.keras.backend.set_learning_phase(False)
    model = ResNet50(weights="imagenet", input_shape=(HEIGHT, WIDTH, CHANNELS), classes=NUM_CLASSES)
    return model

Save Pre-Trained model to HopsFS

def save_model(model):
    """
    Save Pre-Trained ImageNet model to HDFS
    
    Args:
         :model: the pre-trained model with weights trained on imagenet
    Returns:
          The HDFS path where it is saved
    """
    # save trained model
    model.save(MODEL_NAME) #Keras can't save to HDFS in the current version so save to local fs first
    hdfs.copy_to_hdfs(MODEL_NAME, hdfs.project_path() + "Resources/", overwrite=True) # copy from local fs to hdfs
    model_hdfs_path = hdfs.project_path() + "Resources/" + MODEL_NAME
    return model_hdfs_path
hdfs_model_path = save_model(define_model())
hdfs_model_path

Load Pre-Trained model From HopsFS

local_model_path = hdfs.copy_to_local(hdfs_model_path, "", overwrite=True) + MODEL_NAME
model = load_model(MODEL_NAME)

Batch Inference on ImageNet using Spark + Keras

Read Images into a Spark Dataframe

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
df = spark.read.option("mode", "DROPMALFORMED").format("image").load("hdfs://10.0.104.196:8020/Projects/labs/imagenet_2016/ILSVRC2016_CLS-LOC/ILSVRC/Data/CLS-LOC/train/*/")
df_filtered = df.select("image.origin")

Count how many images to perform batch inference on

ImageNet2016 contains 1281167 images in the training dataset.

df_filtered.count()

Parallel Inference using Spark Executors

def inference_fn(partition):
    from hops import hdfs
    try:
        local_model_path = hdfs.copy_to_local(hdfs_model_path, "", overwrite=True) + MODEL_NAME
        model = load_model(MODEL_NAME)
    except:
        print("could not copy model to local")
    for row in partition:
        # some rows in imagenet are malformed so we skip those
        try:
            IMAGE_NAME = row.origin.rsplit('/', 1)[1]
            local_sample_image_path = hdfs.copy_to_local(row.origin, "", overwrite=True) + IMAGE_NAME
            img = image.load_img(local_sample_image_path, target_size=(HEIGHT, WIDTH))
            x = image.img_to_array(img)
            x = np.expand_dims(x, axis=0)
            x = preprocess_input(x)
            predictions = model.predict(x)
            decoded_predictions = decode_predictions(predictions, top=3)
            top_1_id = str(decoded_predictions[0][0][0])
            top_1_label = str(decoded_predictions[0][0][1])
            top_1_confidence = float(decoded_predictions[0][0][2])
            top_2_id = str(decoded_predictions[0][1][0])
            top_2_label = str(decoded_predictions[0][1][1])
            top_2_confidence = float(decoded_predictions[0][1][2])
            top_3_id = str(decoded_predictions[0][2][0])
            top_3_label = str(decoded_predictions[0][2][1])
            top_3_confidence = float(decoded_predictions[0][2][2])    
            Example = Row("image_path", "top1_id", "top1_label", "top1_confidence", "top2_id", 
                          "top2_label", "top2_confidence", "top3_id", "top3_label", "top3_confidence")
            print("Labelled example successfully")
            yield Example(row.origin, top_1_id, top_1_label, top_1_confidence, top_2_id, top_2_label, top_2_confidence,
                          top_3_id, top_3_label, top_3_confidence)
        except:
            print("Failed to label row")
labeled_df = df_filtered.limit(10000).repartition(util.num_executors()*3).rdd.mapPartitions(inference_fn).toDF()
labeled_df.write.mode("overwrite").parquet(hdfs.project_path() + "Resources/labels.parquet")
labeled_df.printSchema()

Compare Prediction against Image

We can do a simple test to compare an image in the dataframe against a predicted label

row = labeled_df.first()

Copy the HDFS path below to the %%local cell to plot it.

row.image_path
row.top1_label
row.top2_label
row.top3_label
%%local
%matplotlib inline 
with tf.Session() as sess:
    sample_img = tf.image.decode_jpeg(tf.read_file("hdfs://10.0.104.196:8020/Projects/labs/imagenet_2016/ILSVRC2016_CLS-LOC/ILSVRC/Data/CLS-LOC/train/n04550184/n04550184_41732.JPEG")).eval()
    plt.imshow(sample_img)
    plt.show()