Do you want to try out this notebook? Get a free account (no credit-card reqd) at hopsworks.ai. You can also install open-source Hopsworks or view tutorial videos here.
Maggy Distributed Training with PyTorch's sharded optimize example
Training with PyTorch’s sharded optimizer
This notebook will show you how to train with PyTorch’s ZeRO optimizer. There is only one change required in the config to make this work. Creating the model and the dataset is identical to previous notebooks and will not be commented further.
from hops import hdfs
import torch
import torch.nn.functional as F
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log |
---|---|---|---|---|---|
194 | application_1617699042861_0021 | pyspark | idle | Link | Link |
SparkSession available as 'spark'.
class CNN(torch.nn.Module):
def __init__(self):
super().__init__()
self.l1 = torch.nn.Conv2d(1,1000,3)
self.l2 = torch.nn.Conv2d(1000,3000,5)
self.l3 = torch.nn.Conv2d(3000,3000,5)
self.l4 = torch.nn.Linear(3000*18*18,10)
def forward(self, x):
x = F.relu(self.l1(x))
x = F.relu(self.l2(x))
x = F.relu(self.l3(x))
x = F.softmax(self.l4(x.flatten(start_dim=1)), dim=0)
return x
Writing the training function
Unlike ZeRO on DeepSpeed, ZeRO with PyTorch does not require any change in the code whatsoever. You can define your training function just like you are used to.
def train_fn(module, hparams, train_set, test_set):
import time
import torch
from maggy.core.patching import MaggyPetastormDataLoader
model = module(**hparams)
batch_size = 4
lr_base = 0.1 * batch_size/256
# Parameters as in https://arxiv.org/pdf/1706.02677.pdf
optimizer = torch.optim.Adam(model.parameters(), lr=lr_base)
loss_criterion = torch.nn.CrossEntropyLoss()
train_loader = MaggyPetastormDataLoader(train_set, batch_size=batch_size)
model.train()
for idx, data in enumerate(train_loader):
img, label = data["image"].float(), data["label"].long()
optimizer.zero_grad()
prediction = model(img)
loss = loss_criterion(prediction, label)
loss.backward()
m1 = torch.cuda.max_memory_allocated(0)
optimizer.step()
m2 = torch.cuda.max_memory_allocated(0)
print("Optimizer pre: {}MB\n Optimizer post: {}MB".format(m1//1e6,m2//1e6))
print(f"Finished batch {idx}")
return float(1)
train_ds = hdfs.project_path() + "/DataSets/MNIST/PetastormMNIST/train_set"
test_ds = hdfs.project_path() + "/DataSets/MNIST/PetastormMNIST/test_set"
print(hdfs.exists(train_ds), hdfs.exists(test_ds))
True True
Configuring the experiment
With the torch
backend, you don’t have to create an extensive, separate config for ZeRO. Simply enter the ZeRO level you want to train with.
from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig
config = TorchDistributedConfig(name='torch_ZeRO', module=CNN, train_set=train_ds, test_set=test_ds, backend="torch", zero_lvl=2)
result = experiment.lagom(train_fn, config)
HBox(children=(FloatProgress(value=0.0, description='Maggy experiment', max=1.0, style=ProgressStyle(descripti…
0: Awaiting worker reservations.
1: Awaiting worker reservations.
1: All executors registered: True
1: Reservations complete, configuring PyTorch.
1: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '52577', 'WORLD_SIZE': '2', 'RANK': '1', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: All executors registered: True
0: Reservations complete, configuring PyTorch.
0: Torch config is {'MASTER_ADDR': '10.0.0.4', 'MASTER_PORT': '52577', 'WORLD_SIZE': '2', 'RANK': '0', 'LOCAL_RANK': '0', 'NCCL_BLOCKING_WAIT': '1', 'NCCL_DEBUG': 'INFO'}
0: Starting distributed training.
1: Starting distributed training.
0: Petastorm dataset detected in folder hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal//DataSets/MNIST/PetastormMNIST/train_set
1: Petastorm dataset detected in folder hdfs://rpc.namenode.service.consul:8020/Projects/PyTorch_spark_minimal//DataSets/MNIST/PetastormMNIST/train_set
0: Optimizer pre: 3870.0MB
Optimizer post: 7414.0MB
0: Finished batch 0
1: Optimizer pre: 3870.0MB
Optimizer post: 5093.0MB
1: Finished batch 0
1: Optimizer pre: 5502.0MB
Optimizer post: 5502.0MB
1: Finished batch 1
0: Optimizer pre: 7414.0MB
Optimizer post: 7414.0MB
0: Finished batch 1
0: Optimizer pre: 7414.0MB
Optimizer post: 7414.0MB
0: Finished batch 2
1: Optimizer pre: 5502.0MB
Optimizer post: 5502.0MB
1: Finished batch 2
1: Optimizer pre: 5502.0MB
Optimizer post: 5502.0MB
1: Finished batch 3
0: Optimizer pre: 7414.0MB
Optimizer post: 7414.0MB
0: Finished batch 3
0: Optimizer pre: 7414.0MB
Optimizer post: 7414.0MB
0: Finished batch 4
1: Optimizer pre: 5502.0MB
Optimizer post: 5502.0MB
1: Finished batch 4
0: Optimizer pre: 7414.0MB
Optimizer post: 7414.0MB
0: Finished batch 5
1: Optimizer pre: 5502.0MB
Optimizer post: 5502.0MB
1: Finished batch 5
An error was encountered:
Invalid status code '400' from http://10.0.0.7:8998/sessions/194/statements/6 with error payload: {"msg":"requirement failed: Session isn't active."}