From b32d1cccf8412aed1fd733d7dd269f1be64019e2 Mon Sep 17 00:00:00 2001 From: David Grove Date: Thu, 27 Mar 2025 13:11:27 -0400 Subject: [PATCH] avoid shell escaping by putting pytorch yaml in a file --- setup.KubeConEU25/README.md | 353 +----------------- .../sample-jobs/pytorch-training.yaml | 346 +++++++++++++++++ 2 files changed, 349 insertions(+), 350 deletions(-) create mode 100644 setup.KubeConEU25/sample-jobs/pytorch-training.yaml diff --git a/setup.KubeConEU25/README.md b/setup.KubeConEU25/README.md index 7d4d79b..a3f052a 100644 --- a/setup.KubeConEU25/README.md +++ b/setup.KubeConEU25/README.md @@ -813,358 +813,11 @@ to run a job that uses [PyTorch](https://pytorch.org) to train a machine learnin This example was constructed by converting a [PyTorch tutorial on FSDP](https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html) into a KubeFlow Trainer [notebook](./sample-jobs/pytorch-training.ipynb) that we used to generate the yaml for a `PyTorchJob`. The YAML generated by running the notebook was then put inside an -`AppWrapper` using MLBatch's [awpack tool](../tools/appwrapper-packager/awpack.py) to produce the final YAML -that we will apply by executing the command below. +`AppWrapper` using MLBatch's [awpack tool](../tools/appwrapper-packager/awpack.py) to produce the final +[pytorch-training.yaml](sample-jobs/pytorch-training.yaml) that will apply to run the workload. ```sh -kubectl apply --as alice -n blue -f- << EOF -apiVersion: workload.codeflare.dev/v1beta2 -kind: AppWrapper -metadata: - name: pytorch-mnist-training - labels: - kueue.x-k8s.io/queue-name: default-queue -spec: - components: - - template: - apiVersion: kubeflow.org/v1 - kind: PyTorchJob - metadata: - name: mnist-training - spec: - nprocPerNode: "2" - pytorchReplicaSpecs: - Master: - replicas: 1 - template: - metadata: - annotations: - sidecar.istio.io/inject: "false" - spec: - containers: - - args: - - |2- - program_path=$(mktemp -d) - read -r -d '' SCRIPT << EOM - def train_function(parameters): - import os - import time - import functools - import torch - import torch.nn as nn - import torch.nn.functional as F - import torch.optim as optim - from torchvision import datasets, transforms - from torch.optim.lr_scheduler import StepLR - import torch.distributed as dist - import torch.distributed as dist - import torch.multiprocessing as mp - from torch.nn.parallel import DistributedDataParallel as DDP - from torch.utils.data.distributed import DistributedSampler - from torch.distributed.fsdp import FullyShardedDataParallel as FSDP - from torch.distributed.fsdp.fully_sharded_data_parallel import ( - CPUOffload, - BackwardPrefetch, - ) - from torch.distributed.fsdp.wrap import ( - size_based_auto_wrap_policy, - enable_wrap, - wrap, - ) - class Net(nn.Module): - def __init__(self): - super(Net, self).__init__() - self.conv1 = nn.Conv2d(1, 32, 3, 1) - self.conv2 = nn.Conv2d(32, 64, 3, 1) - self.dropout1 = nn.Dropout(0.25) - self.dropout2 = nn.Dropout(0.5) - self.fc1 = nn.Linear(9216, 128) - self.fc2 = nn.Linear(128, 10) - def forward(self, x): - x = self.conv1(x) - x = F.relu(x) - x = self.conv2(x) - x = F.relu(x) - x = F.max_pool2d(x, 2) - x = self.dropout1(x) - x = torch.flatten(x, 1) - x = self.fc1(x) - x = F.relu(x) - x = self.dropout2(x) - x = self.fc2(x) - output = F.log_softmax(x, dim=1) - return output - def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): - model.train() - ddp_loss = torch.zeros(2).to(rank) - if sampler: - sampler.set_epoch(epoch) - for batch_idx, (data, target) in enumerate(train_loader): - data, target = data.to(rank), target.to(rank) - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target, reduction='sum') - loss.backward() - optimizer.step() - ddp_loss[0] += loss.item() - ddp_loss[1] += len(data) - dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) - if rank == 0: - print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) - def test(model, rank, world_size, test_loader): - model.eval() - correct = 0 - ddp_loss = torch.zeros(3).to(rank) - with torch.no_grad(): - for data, target in test_loader: - data, target = data.to(rank), target.to(rank) - output = model(data) - ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss - pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability - ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() - ddp_loss[2] += len(data) - dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) - if rank == 0: - test_loss = ddp_loss[0] / ddp_loss[2] - print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( - test_loss, int(ddp_loss[1]), int(ddp_loss[2]), - 100. * ddp_loss[1] / ddp_loss[2])) - # [1] Setup PyTorch distributed and get the distributed parameters. - torch.manual_seed(parameters["seed"]) - dist.init_process_group("nccl") - local_rank = int(os.environ["LOCAL_RANK"]) - rank = dist.get_rank() - world_size = dist.get_world_size() - # Local rank identifies the GPU number inside the pod. - torch.cuda.set_device(local_rank) - print( - f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" - ) - transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ]) - dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, - transform=transform) - dataset2 = datasets.MNIST('/tmp/data', train=False, - transform=transform) - sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) - sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) - train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} - test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} - cuda_kwargs = {'num_workers': 2, - 'pin_memory': True, - 'shuffle': False} - train_kwargs.update(cuda_kwargs) - test_kwargs.update(cuda_kwargs) - train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) - test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) - my_auto_wrap_policy = functools.partial( - size_based_auto_wrap_policy, min_num_params=100 - ) - init_start_event = torch.cuda.Event(enable_timing=True) - init_end_event = torch.cuda.Event(enable_timing=True) - model = Net().to(local_rank) - model = FSDP(model) - optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) - scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) - init_start_event.record() - for epoch in range(1, parameters["epochs"] + 1): - train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) - test(model, local_rank, world_size, test_loader) - scheduler.step() - init_end_event.record() - if rank == 0: - init_end_event.synchronize() - print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") - print(f"{model}") - if parameters["save-model"]: - # use a barrier to make sure training is done on all ranks - dist.barrier() - states = model.state_dict() - if rank == 0: - torch.save(states, "mnist_cnn.pt") - train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) - EOM - printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" - torchrun "$program_path/ephemeral_script.py" - command: - - bash - - -c - image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime - name: pytorch - resources: - limits: - nvidia.com/gpu: "2" - requests: - nvidia.com/gpu: "2" - Worker: - replicas: 1 - template: - metadata: - annotations: - sidecar.istio.io/inject: "false" - spec: - containers: - - args: - - |2- - program_path=$(mktemp -d) - read -r -d '' SCRIPT << EOM - def train_function(parameters): - import os - import time - import functools - import torch - import torch.nn as nn - import torch.nn.functional as F - import torch.optim as optim - from torchvision import datasets, transforms - from torch.optim.lr_scheduler import StepLR - import torch.distributed as dist - import torch.distributed as dist - import torch.multiprocessing as mp - from torch.nn.parallel import DistributedDataParallel as DDP - from torch.utils.data.distributed import DistributedSampler - from torch.distributed.fsdp import FullyShardedDataParallel as FSDP - from torch.distributed.fsdp.fully_sharded_data_parallel import ( - CPUOffload, - BackwardPrefetch, - ) - from torch.distributed.fsdp.wrap import ( - size_based_auto_wrap_policy, - enable_wrap, - wrap, - ) - class Net(nn.Module): - def __init__(self): - super(Net, self).__init__() - self.conv1 = nn.Conv2d(1, 32, 3, 1) - self.conv2 = nn.Conv2d(32, 64, 3, 1) - self.dropout1 = nn.Dropout(0.25) - self.dropout2 = nn.Dropout(0.5) - self.fc1 = nn.Linear(9216, 128) - self.fc2 = nn.Linear(128, 10) - def forward(self, x): - x = self.conv1(x) - x = F.relu(x) - x = self.conv2(x) - x = F.relu(x) - x = F.max_pool2d(x, 2) - x = self.dropout1(x) - x = torch.flatten(x, 1) - x = self.fc1(x) - x = F.relu(x) - x = self.dropout2(x) - x = self.fc2(x) - output = F.log_softmax(x, dim=1) - return output - def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): - model.train() - ddp_loss = torch.zeros(2).to(rank) - if sampler: - sampler.set_epoch(epoch) - for batch_idx, (data, target) in enumerate(train_loader): - data, target = data.to(rank), target.to(rank) - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target, reduction='sum') - loss.backward() - optimizer.step() - ddp_loss[0] += loss.item() - ddp_loss[1] += len(data) - dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) - if rank == 0: - print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) - def test(model, rank, world_size, test_loader): - model.eval() - correct = 0 - ddp_loss = torch.zeros(3).to(rank) - with torch.no_grad(): - for data, target in test_loader: - data, target = data.to(rank), target.to(rank) - output = model(data) - ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss - pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability - ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() - ddp_loss[2] += len(data) - dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) - if rank == 0: - test_loss = ddp_loss[0] / ddp_loss[2] - print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( - test_loss, int(ddp_loss[1]), int(ddp_loss[2]), - 100. * ddp_loss[1] / ddp_loss[2])) - # [1] Setup PyTorch distributed and get the distributed parameters. - torch.manual_seed(parameters["seed"]) - dist.init_process_group("nccl") - local_rank = int(os.environ["LOCAL_RANK"]) - rank = dist.get_rank() - world_size = dist.get_world_size() - # Local rank identifies the GPU number inside the pod. - torch.cuda.set_device(local_rank) - print( - f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" - ) - transform=transforms.Compose([ - transforms.ToTensor(), - transforms.Normalize((0.1307,), (0.3081,)) - ]) - dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, - transform=transform) - dataset2 = datasets.MNIST('/tmp/data', train=False, - transform=transform) - sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) - sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) - train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} - test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} - cuda_kwargs = {'num_workers': 2, - 'pin_memory': True, - 'shuffle': False} - train_kwargs.update(cuda_kwargs) - test_kwargs.update(cuda_kwargs) - train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) - test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) - my_auto_wrap_policy = functools.partial( - size_based_auto_wrap_policy, min_num_params=100 - ) - init_start_event = torch.cuda.Event(enable_timing=True) - init_end_event = torch.cuda.Event(enable_timing=True) - model = Net().to(local_rank) - model = FSDP(model) - optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) - scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) - init_start_event.record() - for epoch in range(1, parameters["epochs"] + 1): - train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) - test(model, local_rank, world_size, test_loader) - scheduler.step() - init_end_event.record() - if rank == 0: - init_end_event.synchronize() - print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") - print(f"{model}") - if parameters["save-model"]: - # use a barrier to make sure training is done on all ranks - dist.barrier() - states = model.state_dict() - if rank == 0: - torch.save(states, "mnist_cnn.pt") - train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) - EOM - printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" - torchrun "$program_path/ephemeral_script.py" - command: - - bash - - -c - image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime - name: pytorch - resources: - limits: - nvidia.com/gpu: "2" - requests: - nvidia.com/gpu: "2" - runPolicy: - suspend: false -EOF +kubectl apply --as alice -n blue -f ./setup.KubeConEU25/sample-jobs/pytorch-training.yaml ``` This will create 2 Pods, each requesting 2 GPUs. On our cluster, it will take about 30 seconds diff --git a/setup.KubeConEU25/sample-jobs/pytorch-training.yaml b/setup.KubeConEU25/sample-jobs/pytorch-training.yaml new file mode 100644 index 0000000..fc063e1 --- /dev/null +++ b/setup.KubeConEU25/sample-jobs/pytorch-training.yaml @@ -0,0 +1,346 @@ +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: pytorch-mnist-training + labels: + kueue.x-k8s.io/queue-name: default-queue +spec: + components: + - template: + apiVersion: kubeflow.org/v1 + kind: PyTorchJob + metadata: + name: mnist-training + spec: + nprocPerNode: "2" + pytorchReplicaSpecs: + Master: + replicas: 1 + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - args: + - |2- + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM + def train_function(parameters): + import os + import time + import functools + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from torchvision import datasets, transforms + from torch.optim.lr_scheduler import StepLR + import torch.distributed as dist + import torch.distributed as dist + import torch.multiprocessing as mp + from torch.nn.parallel import DistributedDataParallel as DDP + from torch.utils.data.distributed import DistributedSampler + from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + from torch.distributed.fsdp.fully_sharded_data_parallel import ( + CPUOffload, + BackwardPrefetch, + ) + from torch.distributed.fsdp.wrap import ( + size_based_auto_wrap_policy, + enable_wrap, + wrap, + ) + class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 32, 3, 1) + self.conv2 = nn.Conv2d(32, 64, 3, 1) + self.dropout1 = nn.Dropout(0.25) + self.dropout2 = nn.Dropout(0.5) + self.fc1 = nn.Linear(9216, 128) + self.fc2 = nn.Linear(128, 10) + def forward(self, x): + x = self.conv1(x) + x = F.relu(x) + x = self.conv2(x) + x = F.relu(x) + x = F.max_pool2d(x, 2) + x = self.dropout1(x) + x = torch.flatten(x, 1) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout2(x) + x = self.fc2(x) + output = F.log_softmax(x, dim=1) + return output + def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): + model.train() + ddp_loss = torch.zeros(2).to(rank) + if sampler: + sampler.set_epoch(epoch) + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(rank), target.to(rank) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target, reduction='sum') + loss.backward() + optimizer.step() + ddp_loss[0] += loss.item() + ddp_loss[1] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) + def test(model, rank, world_size, test_loader): + model.eval() + correct = 0 + ddp_loss = torch.zeros(3).to(rank) + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(rank), target.to(rank) + output = model(data) + ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss + pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() + ddp_loss[2] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + test_loss = ddp_loss[0] / ddp_loss[2] + print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( + test_loss, int(ddp_loss[1]), int(ddp_loss[2]), + 100. * ddp_loss[1] / ddp_loss[2])) + # [1] Setup PyTorch distributed and get the distributed parameters. + torch.manual_seed(parameters["seed"]) + dist.init_process_group("nccl") + local_rank = int(os.environ["LOCAL_RANK"]) + rank = dist.get_rank() + world_size = dist.get_world_size() + # Local rank identifies the GPU number inside the pod. + torch.cuda.set_device(local_rank) + print( + f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" + ) + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ]) + dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, + transform=transform) + dataset2 = datasets.MNIST('/tmp/data', train=False, + transform=transform) + sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) + sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) + train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} + test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} + cuda_kwargs = {'num_workers': 2, + 'pin_memory': True, + 'shuffle': False} + train_kwargs.update(cuda_kwargs) + test_kwargs.update(cuda_kwargs) + train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) + test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) + my_auto_wrap_policy = functools.partial( + size_based_auto_wrap_policy, min_num_params=100 + ) + init_start_event = torch.cuda.Event(enable_timing=True) + init_end_event = torch.cuda.Event(enable_timing=True) + model = Net().to(local_rank) + model = FSDP(model) + optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) + scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) + init_start_event.record() + for epoch in range(1, parameters["epochs"] + 1): + train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) + test(model, local_rank, world_size, test_loader) + scheduler.step() + init_end_event.record() + if rank == 0: + init_end_event.synchronize() + print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") + print(f"{model}") + if parameters["save-model"]: + # use a barrier to make sure training is done on all ranks + dist.barrier() + states = model.state_dict() + if rank == 0: + torch.save(states, "mnist_cnn.pt") + train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) + EOM + printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" + torchrun "$program_path/ephemeral_script.py" + command: + - bash + - -c + image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime + name: pytorch + resources: + limits: + nvidia.com/gpu: "2" + requests: + nvidia.com/gpu: "2" + Worker: + replicas: 1 + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - args: + - |2- + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM + def train_function(parameters): + import os + import time + import functools + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from torchvision import datasets, transforms + from torch.optim.lr_scheduler import StepLR + import torch.distributed as dist + import torch.distributed as dist + import torch.multiprocessing as mp + from torch.nn.parallel import DistributedDataParallel as DDP + from torch.utils.data.distributed import DistributedSampler + from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + from torch.distributed.fsdp.fully_sharded_data_parallel import ( + CPUOffload, + BackwardPrefetch, + ) + from torch.distributed.fsdp.wrap import ( + size_based_auto_wrap_policy, + enable_wrap, + wrap, + ) + class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 32, 3, 1) + self.conv2 = nn.Conv2d(32, 64, 3, 1) + self.dropout1 = nn.Dropout(0.25) + self.dropout2 = nn.Dropout(0.5) + self.fc1 = nn.Linear(9216, 128) + self.fc2 = nn.Linear(128, 10) + def forward(self, x): + x = self.conv1(x) + x = F.relu(x) + x = self.conv2(x) + x = F.relu(x) + x = F.max_pool2d(x, 2) + x = self.dropout1(x) + x = torch.flatten(x, 1) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout2(x) + x = self.fc2(x) + output = F.log_softmax(x, dim=1) + return output + def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): + model.train() + ddp_loss = torch.zeros(2).to(rank) + if sampler: + sampler.set_epoch(epoch) + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(rank), target.to(rank) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target, reduction='sum') + loss.backward() + optimizer.step() + ddp_loss[0] += loss.item() + ddp_loss[1] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) + def test(model, rank, world_size, test_loader): + model.eval() + correct = 0 + ddp_loss = torch.zeros(3).to(rank) + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(rank), target.to(rank) + output = model(data) + ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss + pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() + ddp_loss[2] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + test_loss = ddp_loss[0] / ddp_loss[2] + print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( + test_loss, int(ddp_loss[1]), int(ddp_loss[2]), + 100. * ddp_loss[1] / ddp_loss[2])) + # [1] Setup PyTorch distributed and get the distributed parameters. + torch.manual_seed(parameters["seed"]) + dist.init_process_group("nccl") + local_rank = int(os.environ["LOCAL_RANK"]) + rank = dist.get_rank() + world_size = dist.get_world_size() + # Local rank identifies the GPU number inside the pod. + torch.cuda.set_device(local_rank) + print( + f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" + ) + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ]) + dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, + transform=transform) + dataset2 = datasets.MNIST('/tmp/data', train=False, + transform=transform) + sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) + sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) + train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} + test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} + cuda_kwargs = {'num_workers': 2, + 'pin_memory': True, + 'shuffle': False} + train_kwargs.update(cuda_kwargs) + test_kwargs.update(cuda_kwargs) + train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) + test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) + my_auto_wrap_policy = functools.partial( + size_based_auto_wrap_policy, min_num_params=100 + ) + init_start_event = torch.cuda.Event(enable_timing=True) + init_end_event = torch.cuda.Event(enable_timing=True) + model = Net().to(local_rank) + model = FSDP(model) + optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) + scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) + init_start_event.record() + for epoch in range(1, parameters["epochs"] + 1): + train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) + test(model, local_rank, world_size, test_loader) + scheduler.step() + init_end_event.record() + if rank == 0: + init_end_event.synchronize() + print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") + print(f"{model}") + if parameters["save-model"]: + # use a barrier to make sure training is done on all ranks + dist.barrier() + states = model.state_dict() + if rank == 0: + torch.save(states, "mnist_cnn.pt") + train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) + EOM + printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" + torchrun "$program_path/ephemeral_script.py" + command: + - bash + - -c + image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime + name: pytorch + resources: + limits: + nvidia.com/gpu: "2" + requests: + nvidia.com/gpu: "2" + runPolicy: + suspend: false