Creating a Petastorm dataset from MNIST example

Creating a Petastorm MNIST dataset

In this notebook we are going to create a Petastorm dataset from the famous MNIST dataset. Compared to ImageNette it has the advantage of being easily available through PyTorch. It is also considerably smaller which makes it easier to experiment with.

from hops import hdfs
import numpy as np
from torchvision.datasets import MNIST
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
181application_1617699042861_0008pysparkidleLinkLink
SparkSession available as 'spark'.
path = hdfs.project_path() + "Resources/Petastorm"

Downloading the dataset with torchvision

Torchvision provides a simple interface to download the MNIST dataset. Note that the download prior to version 0.9.1 is broken! If you have issues with this, please upgrade your installation to the latest version. For other workarounds, see here.

path = hdfs.project_path() + "DataSets/MNIST"
train_dataset = MNIST(path, download=True)
test_dataset = MNIST(path, download=True, train=False)
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw

Processing...
Done!
9913344it [00:00, 11845783.99it/s]                           
29696it [00:00, 357933.48it/s]           
1649664it [00:00, 2811158.81it/s]                           
5120it [00:00, 4901811.57it/s]          
/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/torchvision/datasets/mnist.py:502: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at  /pytorch/torch/csrc/utils/tensor_numpy.cpp:143.)
  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)

Setting up the petastorm dataset generation

Now that we have our dataset, creating the petastorm dataset is exactly the same as with ImageNette. Note that for distributed training you need an even dataset. If your dataset is not even (meaning that each node sees the same amount of examples) you can increase the number of parquet files in order to allow for a more fine grained distribution among nodes.

from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType


MNISTSchema = Unischema('ScalarSchema', [
   UnischemaField('image', np.uint8, (1,28,28), NdarrayCodec(), False),
   UnischemaField('label', np.int8, (), ScalarCodec(IntegerType()), False)])

def row_generator(idx, dataset):
    img, label = dataset[idx]
    return {'image': np.expand_dims(np.array(img, dtype=np.uint8), axis=0), 'label': label}


def generate_MNIST_dataset(output_url, dataset):
    rowgroup_size_mb = 1
    rows_count = len(dataset)
    parquet_files_count = 100
    
    sc = spark.sparkContext
    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as
    # well as save petastorm specific metadata
    with materialize_dataset(spark, output_url, MNISTSchema, rowgroup_size_mb):
        rows_rdd = sc.parallelize(range(rows_count))\
            .map(lambda x: row_generator(x, dataset))\
            .map(lambda x: dict_to_spark_row(MNISTSchema, x))

        spark.createDataFrame(rows_rdd, MNISTSchema.as_spark_schema()) \
            .repartition(parquet_files_count) \
            .write \
            .mode('overwrite') \
            .parquet(output_url)

Generating the dataset

Now that everything is set up, we can define our output paths and generate the datasets.

train_path = hdfs.project_path() + "DataSets/MNIST/PetastormMNIST/train_set"
test_path = hdfs.project_path() + "DataSets/MNIST/PetastormMNIST/test_set"
generate_MNIST_dataset(train_path, train_dataset)
generate_MNIST_dataset(test_path, test_dataset)