Maggy Distributed Training with PyTorch's sharded optimize example

Training with PyTorch’s sharded optimizer

This notebook will show you how to train with PyTorch’s ZeRO optimizer. There is only one change required in the config to make this work. Creating the model and the dataset is identical to previous notebooks and will not be commented further.

from hops import hdfs
import torch
import torch.nn.functional as F
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver log
194application_1617699042861_0021pysparkidleLinkLink
SparkSession available as 'spark'.
class CNN(torch.nn.Module):
    
    def __init__(self):
        super().__init__()
        self.l1 = torch.nn.Conv2d(1,1000,3)
        self.l2 = torch.nn.Conv2d(1000,3000,5)
        self.l3 = torch.nn.Conv2d(3000,3000,5)
        self.l4 = torch.nn.Linear(3000*18*18,10)
        
    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = F.relu(self.l3(x))
        x = F.softmax(self.l4(x.flatten(start_dim=1)), dim=0)
        return x

Writing the training function

Unlike ZeRO on DeepSpeed, ZeRO with PyTorch does not require any change in the code whatsoever. You can define your training function just like you are used to.

def train_fn(module, hparams, train_set, test_set):
    
    import time
    import torch
        
    from maggy.core.patching import MaggyPetastormDataLoader
    
    model = module(**hparams)
    
    batch_size = 4
    lr_base = 0.1 * batch_size/256
    
    # Parameters as in https://arxiv.org/pdf/1706.02677.pdf
    optimizer = torch.optim.Adam(model.parameters(), lr=lr_base)
    loss_criterion = torch.nn.CrossEntropyLoss()
    
    train_loader = MaggyPetastormDataLoader(train_set, batch_size=batch_size)
                            
    model.train()
    for idx, data in enumerate(train_loader):
        img, label = data["image"].float(), data["label"].long()
        optimizer.zero_grad()
        prediction = model(img)
        loss = loss_criterion(prediction, label)
        loss.backward()
        m1 = torch.cuda.max_memory_allocated(0)
        optimizer.step()
        m2 = torch.cuda.max_memory_allocated(0)
        print("Optimizer pre: {}MB\n Optimizer post: {}MB".format(m1//1e6,m2//1e6))
        print(f"Finished batch {idx}")
    return float(1)
train_ds = hdfs.project_path() + "/DataSets/MNIST/PetastormMNIST/train_set"
test_ds = hdfs.project_path() + "/DataSets/MNIST/PetastormMNIST/test_set"
print(hdfs.exists(train_ds), hdfs.exists(test_ds))
True True

Configuring the experiment

With the torch backend, you don’t have to create an extensive, separate config for ZeRO. Simply enter the ZeRO level you want to train with.

from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(name='torch_ZeRO', module=CNN, train_set=train_ds, test_set=test_ds, backend="torch", zero_lvl=2)
result = experiment.lagom(train_fn, config)
HBox(children=(FloatProgress(value=0.0, description='Maggy experiment', max=1.0, style=ProgressStyle(descripti…


0: Awaiting worker reservations.
1: Awaiting worker reservations.
1: All executors registered: True
1: Reservations complete, configuring PyTorch.
1: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '52577', 'WORLD_SIZE': '2', 'RANK': '1', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: All executors registered: True
0: Reservations complete, configuring PyTorch.
0: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '52577', 'WORLD_SIZE': '2', 'RANK': '0', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: Starting distributed training.
1: Starting distributed training.
0: Petastorm dataset detected in folder hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal//DataSets/MNIST/PetastormMNIST/train_set
1: Petastorm dataset detected in folder hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal//DataSets/MNIST/PetastormMNIST/train_set
0: Optimizer pre: 3870.0MB
 Optimizer post: 7414.0MB
0: Finished batch 0
1: Optimizer pre: 3870.0MB
 Optimizer post: 5093.0MB
1: Finished batch 0
1: Optimizer pre: 5502.0MB
 Optimizer post: 5502.0MB
1: Finished batch 1
0: Optimizer pre: 7414.0MB
 Optimizer post: 7414.0MB
0: Finished batch 1
0: Optimizer pre: 7414.0MB
 Optimizer post: 7414.0MB
0: Finished batch 2
1: Optimizer pre: 5502.0MB
 Optimizer post: 5502.0MB
1: Finished batch 2
1: Optimizer pre: 5502.0MB
 Optimizer post: 5502.0MB
1: Finished batch 3
0: Optimizer pre: 7414.0MB
 Optimizer post: 7414.0MB
0: Finished batch 3
0: Optimizer pre: 7414.0MB
 Optimizer post: 7414.0MB
0: Finished batch 4
1: Optimizer pre: 5502.0MB
 Optimizer post: 5502.0MB
1: Finished batch 4
0: Optimizer pre: 7414.0MB
 Optimizer post: 7414.0MB
0: Finished batch 5
1: Optimizer pre: 5502.0MB
 Optimizer post: 5502.0MB
1: Finished batch 5


An error was encountered:
Invalid status code '400' from http://10.0.0.7:8998/sessions/194/statements/6 with error payload: {"msg":"requirement failed: Session isn't active."}