Maggy distributed training ResNet-50 on ImageNet using PyTorch

Training ResNet50 with distributed training on Maggy

In this notebook we will train a ResNet-50 model from scratch with data from ImageNet. Note that a PyTorch Dataset and DataLoader is employed which results in large I/O overhead and doesn’t fully utilize the GPU capabilities. For higher throughput, see ImageNet_petastorm_training.

import time

import torch
from torchvision import models
from torchvision import transforms as T

import pandas as pd
import numpy as np
from PIL import Image

from hops import hdfs
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
184application_1617699042861_0011pysparkidleLinkLink
SparkSession available as 'spark'.

Creating the PyTorch Dataset

The metadata of our dataset is stored in a .csv file located in the root folder. It contains the labels of each image and its source path. For convenience, we relabel the classes into integers. In the __getitem__ function, we enable custom transformations after reading the image and its label. The advantage of defining our own dataset is that we have no problems performing I/O operations on our DFS, which would fail when simply calling os.open() (which is what PyTorch’s predefined datasets do).

class ImageNetDataset(torch.utils.data.Dataset):
    
    def __init__(self, path, transform=None, test_set=False):
        super().__init__()
        self.root = path
        self.df = pd.read_csv(path + "noisy_imagenette.csv")
        self.transform = transform
        if test_set:
            self.df = self.df[self.df.is_valid]
        else:
            self.df = self.df[self.df.is_valid == False]
        self.df.drop(["noisy_labels_" + str(i) for i in [1, 5, 25,50]], axis=1, inplace=True)
        self.labels = {"n01440764": 0,  # "tench" 
                       "n02102040": 1,  # "English springer"
                       "n02979186": 2,  # "cassette player"
                       "n03000684": 3,  # "chain saw"
                       "n03028079": 4,  # "church"
                       "n03394916": 5,  # "French horn"
                       "n03417042": 6,  # "garbage truck"
                       "n03425413": 7,  # "gas pump"
                       "n03445777": 8,  # "golf ball"
                       "n03888257": 9,  # "parachute"
                      }
    
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        label = self.labels[row["noisy_labels_0"]]
        f = hdfs.open_file(self.root + row["path"])
        try:
            img = Image.open(f).convert("RGB")
        finally:
            f.close()
        if self.transform:
            img = self.transform(img)
        sample = {"image": img, "label": label}
        return sample
path = hdfs.project_path() + "DataSets/ImageNet/imagenette/"

Defining data transforms

To increase the variety of our training samples, we employ data augmentation via torchvision’s transforms API. For training images, in addition to resizing and randomly cropping, we also flip images horizontally. In the test set, we use a center crop and no flips to remove randomness. All images are normalized for numeric convenience.

train_transform = T.Compose(
    [T.Resize(256),
     T.RandomCrop(224),
     T.RandomHorizontalFlip(),
     T.ToTensor(),
     T.Normalize(mean=[0.485, 0.456, 0.406],
                 std=[0.229, 0.224, 0.225])
    ])

test_transform = T.Compose(
    [T.Resize(256),
     T.CenterCrop(224),
     T.ToTensor(),
     T.Normalize(mean=[0.485, 0.456, 0.406],
                 std=[0.229, 0.224, 0.225])
    ])
train_ds = ImageNetDataset(path, transform=train_transform)
test_ds = ImageNetDataset(path, transform=test_transform, test_set=True)

Defining the training function

In order to use PyTorch with maggy, we need to define our training loop in a function. The function takes our module, its hyperparameters and both the train and test set as input. Note that the module should be a class that is instantiated in our training loop, since transferring the model weights at the beginning of the loop would result in a huge communicational overhead. Likewise, it is not advised to use datasets with large memory footprint over the function, but rather load it from the DFS when requested. Inside the training loop it is mandatory for maggy to use a torch DataLoader. Apart from these restrictions, you can freely implement your training loop as in normal PyTorch. Finally, we have to import all of the used libraries inside the function.

def train_fn(module, hparams, train_set, test_set):
    
    import torch
    import time
    from torch.utils.data import DataLoader
    
    model = module(**hparams)
    
    n_epochs = 3
    batch_size = 64
    lr = 0.1 * 2*batch_size/256
    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_criterion = torch.nn.CrossEntropyLoss()
    
    train_loader = DataLoader(train_set, pin_memory=True, batch_size=batch_size)
    test_loader = DataLoader(test_set, pin_memory=True, batch_size=batch_size, drop_last=True)

    def eval_model(model, test_loader):
        acc = 0
        model.eval()
        img_cnt = 0
        with torch.no_grad():
            for idx, data in enumerate(test_loader):
                img, label = data["image"], data["label"]
                prediction = model(img)
                acc += torch.sum(torch.argmax(prediction, dim=1) == label)
                img_cnt += len(label)
        acc = acc.detach()/float(img_cnt)
        print("Test accuracy: {:.3f}".format(acc))
        print("-"*20)
        return acc

    model.train()
    t_0 = time.time()
    for epoch in range(n_epochs):
        print("-"*20 + "\nStarting new epoch\n")
        t_start = time.time()
        for idx, data in enumerate(train_loader):
            if idx%10 == 0:
                print(f"Working on batch {idx}.")
            img, label = data["image"], data["label"]
            optimizer.zero_grad()
            prediction = model(img)
            output = loss_criterion(prediction, label.long())
            output.backward()
            optimizer.step()
        t_end = time.time()
        print("Epoch training took {:.0f}s.\n".format(t_end-t_start))
        acc = eval_model(model, test_loader)
    t_1 = time.time()
    minutes, seconds = divmod(t_1 - t_0, 60)
    hours, minutes = divmod(minutes, 60)
    print("-"*20 + "\nTotal training time: {:.0f}h {:.0f}m {:.0f}s.".format(hours, minutes, seconds))
    return float(acc)

Configuring maggy

As a last step, we need to configure our maggy experiment. Here we pass our model class, our train and test dataset as well as the desired backend. Maggy supports either torch or deepspeed, with additional constraints on deepspeed. If using torch, you can employ the PyTorch version of the ZeRO optimizer and model sharding by changing the ZeRO levels in the config (either 1, 2 or 3).

from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(name='ImageNet_training', module=models.resnet50, train_set=train_ds, test_set=test_ds, backend="torch")

Running the experiment

Now that everything is configured, we are ready to run the experiment by calling the lagom function. You should be able to see the output of your workers in the notebook.

result = experiment.lagom(train_fn, config)
HBox(children=(FloatProgress(value=0.0, description='Maggy experiment', max=1.0, style=ProgressStyle(descripti…


0: Awaiting worker reservations.
1: Awaiting worker reservations.
1: All executors registered: True
1: Reservations complete, configuring PyTorch.
1: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '50261', 'WORLD_SIZE': '2', 'RANK': '1', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: All executors registered: True
0: Reservations complete, configuring PyTorch.
0: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '50261', 'WORLD_SIZE': '2', 'RANK': '0', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: Starting distributed training.
1: Starting distributed training.
0: --------------------
Starting new epoch

1: --------------------
Starting new epoch

1: Working on batch 0.
0: Working on batch 0.
1: Working on batch 10.
0: Working on batch 10.
1: Working on batch 20.
0: Working on batch 20.
1: Working on batch 30.
0: Working on batch 30.
1: Working on batch 40.
0: Working on batch 40.
1: Working on batch 50.
0: Working on batch 50.
1: Working on batch 60.
0: Working on batch 60.
1: Working on batch 70.
0: Working on batch 70.
1: Epoch training took 443s.

0: Epoch training took 443s.

1: Test accuracy: 0.188
1: --------------------
1: --------------------
Starting new epoch

1: Working on batch 0.
0: Test accuracy: 0.180
0: --------------------
0: --------------------
Starting new epoch

0: Working on batch 0.
1: Working on batch 10.
0: Working on batch 10.
1: Working on batch 20.
0: Working on batch 20.
1: Working on batch 30.
0: Working on batch 30.
1: Working on batch 40.
0: Working on batch 40.
1: Working on batch 50.
0: Working on batch 50.
1: Working on batch 60.
0: Working on batch 60.
1: Working on batch 70.
0: Working on batch 70.
0: Epoch training took 436s.

1: Epoch training took 470s.

1: Test accuracy: 0.157
1: --------------------
1: --------------------
Starting new epoch

1: Working on batch 0.
0: Test accuracy: 0.144
0: --------------------
0: --------------------
Starting new epoch

0: Working on batch 0.
1: Working on batch 10.
0: Working on batch 10.


An error was encountered:
Invalid status code '400' from http://10.0.0.7:8998/sessions/184/statements/8 with error payload: {"msg":"requirement failed: Session isn't active."}