Creating a Petastorm dataset from MNIST example
Creating a Petastorm MNIST dataset
In this notebook we are going to create a Petastorm dataset from the famous MNIST dataset. Compared to ImageNette it has the advantage of being easily available through PyTorch. It is also considerably smaller which makes it easier to experiment with.
from hops import hdfs
import numpy as np
from torchvision.datasets import MNIST
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
181 | application_1617699042861_0008 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
path = hdfs.project_path() + "Resources/Petastorm"
Downloading the dataset with torchvision
Torchvision provides a simple interface to download the MNIST dataset. Note that the download prior to version 0.9.1 is broken! If you have issues with this, please upgrade your installation to the latest version. For other workarounds, see here.
path = hdfs.project_path() + "DataSets/MNIST"
train_dataset = MNIST(path, download=True)
test_dataset = MNIST(path, download=True, train=False)
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/train-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-images-idx3-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 503: Service Unavailable
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz
Extracting hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw/t10k-labels-idx1-ubyte.gz to hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal/DataSets/MNIST/MNIST/raw
Processing...
Done!
9913344it [00:00, 11845783.99it/s]
29696it [00:00, 357933.48it/s]
1649664it [00:00, 2811158.81it/s]
5120it [00:00, 4901811.57it/s]
/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/torchvision/datasets/mnist.py:502: UserWarning: The given NumPy array is not writeable, and PyTorch does not support non-writeable tensors. This means you can write to the underlying (supposedly non-writeable) NumPy array using the tensor. You may want to copy the array to protect its data or make it writeable before converting it to a tensor. This type of warning will be suppressed for the rest of this program. (Triggered internally at /pytorch/torch/csrc/utils/tensor_numpy.cpp:143.)
return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
Setting up the petastorm dataset generation
Now that we have our dataset, creating the petastorm dataset is exactly the same as with ImageNette. Note that for distributed training you need an even dataset. If your dataset is not even (meaning that each node sees the same amount of examples) you can increase the number of parquet files in order to allow for a more fine grained distribution among nodes.
from petastorm.codecs import CompressedImageCodec, NdarrayCodec, ScalarCodec
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
MNISTSchema = Unischema('ScalarSchema', [
UnischemaField('image', np.uint8, (1,28,28), NdarrayCodec(), False),
UnischemaField('label', np.int8, (), ScalarCodec(IntegerType()), False)])
def row_generator(idx, dataset):
img, label = dataset[idx]
return {'image': np.expand_dims(np.array(img, dtype=np.uint8), axis=0), 'label': label}
def generate_MNIST_dataset(output_url, dataset):
rowgroup_size_mb = 1
rows_count = len(dataset)
parquet_files_count = 100
sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
with materialize_dataset(spark, output_url, MNISTSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(lambda x: row_generator(x, dataset))\
.map(lambda x: dict_to_spark_row(MNISTSchema, x))
spark.createDataFrame(rows_rdd, MNISTSchema.as_spark_schema()) \
.repartition(parquet_files_count) \
.write \
.mode('overwrite') \
.parquet(output_url)
Generating the dataset
Now that everything is set up, we can define our output paths and generate the datasets.
train_path = hdfs.project_path() + "DataSets/MNIST/PetastormMNIST/train_set"
test_path = hdfs.project_path() + "DataSets/MNIST/PetastormMNIST/test_set"
generate_MNIST_dataset(train_path, train_dataset)
generate_MNIST_dataset(test_path, test_dataset)