# Creating a petastorm Dataset from ImageNet

## Why petastorm?

Petastorm is an open source library for large datasets, suited for high throughput I/O applications. Petastorm uses parquet as a columnar storage format which allows for better compression than e.g. the .csv format and combines fragmented datasets consisting of many files into fewer and larger files. You should use petastorm when your DataLoader needs to read a lot of files and is slowing down your training. One drawback of petastorm datasets is that you loose random access to elements and the dataset’s length. In PyTorch Dataset terms, petastorm implements an IterativeDataset.

## The dataset

For this example, we use the ImageNette dataset (https://github.com/fastai/imagenette), a subset of the original ImageNet dataset. It contains ten categories with training and test images for each. The images vary in their size from merely ~150x150 resolution to 4k images.

## The files

This notebook assumes that the ImageNette folder is present and extracted in DataSets/ImageNet/imagenette/.

from hops import hdfs
import pandas as pd
import numpy as np
import torch
import torchvision.transforms as T
from PIL import Image
Starting Spark application

IDYARN Application IDKindStateSpark UIDriver log
SparkSession available as 'spark'.


## Creating a PyTorch Dataset

We first create a Dataset just as we would for a normal PyTorch training script. The goal here is to produce a generator for training samples that behaves exactly as we are used from PyTorch. We could of course implement another row generator with the same functionality, but this is probably the easiest way.

The metadata of our dataset is stored in a .csv file located in the root folder. It contains the labels of each image and its source path. For convenience, we relabel the classes into integers. Also, since distributed training performs best on balanced datasets (=same amount of batches per worker), we drop the last few examples. This is not strictly necessary, but can help you maximize your GPU utilization later on.

Petastorm expects its images to be of uniform size. This is a direct consequence of the column-based storage format with strict schemas. In the __getitem__ function, we therefore need to crop the images to a standard resolution. Luckily, PyTorch datasets enable us to do so by simply applying a transformation after reading the image and its label. For now this transform is generic, it will be defined later. Another advantage of defining our own dataset is that we have no problems performing I/O operations on our DFS, which would fail when simply calling os.open().

class ImageNetDataset(torch.utils.data.Dataset):

def __init__(self, path, n_exec, batch_size=1, transform=None, test_set=False):
super().__init__()
self.root = path
self.transform = transform
if test_set:
self.df = self.df[self.df.is_valid]
else:
self.df = self.df[self.df.is_valid == False]
self.df.drop(["noisy_labels_" + str(i) for i in [1, 5, 25,50]], axis=1, inplace=True)
self.labels = {"n01440764": 0,  # "tench"
"n02102040": 1,  # "English springer"
"n02979186": 2,  # "cassette player"
"n03000684": 3,  # "chain saw"
"n03028079": 4,  # "church"
"n03394916": 5,  # "French horn"
"n03417042": 6,  # "garbage truck"
"n03425413": 7,  # "gas pump"
"n03445777": 8,  # "golf ball"
"n03888257": 9,  # "parachute"
}
# Drop the last samples to create a balanced dataset.
self.req_items = n_exec*batch_size
len_df = len(self.df)
self.df.drop(index=self.df.index[len_df-len_df%self.req_items-1:len_df-1], inplace=True)

def __len__(self):
return len(self.df)

def __getitem__(self, idx):
row = self.df.iloc[idx]
label = self.labels[row["noisy_labels_0"]]
f = hdfs.open_file(self.root + row["path"])
try:
img = Image.open(f).convert("RGB")
finally:
f.close()
if self.transform:
img = self.transform(img)
sample = {"image": np.array(img, dtype=np.uint8), "label": label}
return sample

### Creating the datasets

We create both the train and the test set by setting the correct source path for their folder. Since petastorm requires uniformly sized images as previously mentioned, we add a torch transform to resize and crop the images into fitting sizes.

root_folder = hdfs.project_path() + "DataSets/ImageNet/imagenette/"

train_path = hdfs.project_path() + "DataSets/ImageNet/PetastormImageNette/train"
test_path = hdfs.project_path() + "DataSets/ImageNet/PetastormImageNette/test"

transform = T.Compose(
[T.Resize(256),
T.CenterCrop(256),
])

train_dataset = ImageNetDataset(root_folder, 2, 64, transform=transform)
test_dataset = ImageNetDataset(root_folder, 2, 64, transform=transform, test_set=True)

print("Length training set: {}, Size of test set: {}".format(len(train_dataset), len(test_dataset))
9344
3840


### Preparing the petastorm schema and generator

In order to create a petastorm dataset, we need to provide two things: First, a schema of our dataset, and second, a row generator that maps indices of the dataset to a dictionary containing the data. In our case, the schema is as simple as choosing a numpy array of size (256,256,3), dtype uint8 for the images, and an int8 scalar for the labels. Note that for images other than PyTorch’s tensors, the channels reside in the 3rd dimension. For more information on the schema, go to https://github.com/uber/petastorm. Since we already created our datasets for this exact purpose, we can simply return the dataset at the desired index as a row generator.

### Configuring the dataset

The with materialize_dataset() API then creates a petastorm dataset with the spark operations performed within the context. Note that you can adjust the amount of parquet files. For distributed training, it is necessary to have at least as many files as nodes. In general, it is advised to make parquet_files_count divisable by the number of nodes in your setup. It also controls the size of the individual files by distributing the total size over more files.

from petastorm.codecs import CompressedImageCodec, ScalarCodec
from petastorm.unischema import Unischema, UnischemaField, dict_to_spark_row
from pyspark.sql.types import IntegerType

ImageNetSchema = Unischema('ScalarSchema', [
UnischemaField('image', np.uint8, (256, 256, 3), CompressedImageCodec("png"), False),
UnischemaField('label', np.int8, (), ScalarCodec(IntegerType()), False)])

def row_generator(idx, dataset):
return dataset[idx]

def generate_ImageNet_dataset(output_url, dataset):
rowgroup_size_mb = 8
rows_count = len(dataset)
parquet_files_count = 16

sc = spark.sparkContext
# Wrap dataset materialization portion. Will take care of setting up spark environment variables as
# well as save petastorm specific metadata
with materialize_dataset(spark, output_url, ImageNetSchema, rowgroup_size_mb):
rows_rdd = sc.parallelize(range(rows_count))\
.map(lambda x: row_generator(x, dataset))\
.map(lambda x: dict_to_spark_row(ImageNetSchema, x))

spark.createDataFrame(rows_rdd, ImageNetSchema.as_spark_schema()) \
.repartition(parquet_files_count) \
.write \
.mode('overwrite') \
.parquet(output_url)

### Write the dataset

Once we defined all necessary functions and schemas, we can simply call the functions on the two different datasets to invoke the creation of our datasets. Note that this might take a few minutes due to the amount of data that has to be read and processed.

generate_ImageNet_dataset(train_path, train_dataset)
generate_ImageNet_dataset(test_path, test_dataset)