top of page

Quick Tutorial on Distributed Data Parallel Training on PyTorch with Multi GPUs

A quick tutorial on distributed data parallel training on PyTorch with multiple GPUs to let beginners start training in just a few minutes.


quick tutorial on distributed data parallel training on pytorch on cloud hpc aws google cloud azure

DP vs. DDP


We know that PyTorch itself provides two implementations for multi-GPU training.


  • DataParallel (DP): Parameter Server mode, one card bit reducer, and super simple to implement, one line of code.

  • DistributedDataParallel (DDP): All-Reduce mode, intended for distributed training, but can also be used for training on a single node with multi cards.


DataParallel is an algorithm based on Parameter Server algorithm, which is relatively simple to implement by adding one line to the original standalone single card code:


model = nn.DataParallel(model, device_ids=config.gpu_id)

But its load imbalance problem is more serious, sometimes when the model is larger (such as bert-large), the reducer's card will have an extra 3-4G of GPU memory occupied.


And the speed is also slower:



DistributedDataParallel


The official recommendation is to use the new DDP with all-reduce algorithm, which was designed mainly for multi-node multi-GPU training, but it also works on a single-node multi-GPU training.


First, a few concepts need to be clarified.


  • rank


Multi-node/multi-GPU: represents a particular node


Single-node/multi-GPU Mode: represents a particular GPU


  • world_size


Multi-node/multi-GPU: represents how many nodes


Single-node/multi-GPU: represents how many GPUs there are


  • local_rank


Multi-node/multi-GPU: the number of a GPU


Single-node/multi-GPU: the number of a GPU


Single-node single-GPU training


Let's start by giving a demo of a single node with a single card training code and simply run the data stream. The demo is small but complete. This demo contains the complete steps of our usual deep learning training process. It includes the definition and instantiation of the model and dataset, the loss function, the definition of the optimizer, gradient clearing, gradient backpropagation, optimizer iteration update, and the printing of the training log.


Next, we will use the DistributedDataParallel provided by PyTorch to convert this single-computer, single-card training process to single-node, multi-GPU parallel training.


import torch
import torch.nn as nn
from torch.optim import SGD
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--gpu_id', type=str, default='0,2')
parser.add_argument('--batchSize', type=int, default=32)
parser.add_argument('--epochs', type=int, default=5)
parser.add_argument('--dataset-size', type=int, default=128)
parser.add_argument('--num-classes', type=int, default=10)
config = parser.parse_args()

os.environ['CUDA_VISIBLE_DEVICES'] = config.gpu_id

# Define dataset sample 
class RandomDataset(Dataset):
    def __init__(self, dataset_size, image_size=32):
        images = torch.randn(dataset_size, 3, image_size, image_size)
        labels = torch.zeros(dataset_size, dtype=int)
        self.data = list(zip(images, labels))

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return len(self.data)

# Define the model
class Model(nn.Module):
    def __init__(self, num_classes):
        super(Model, self).__init__()
        self.conv2d = nn.Conv2d(3, 16, 3)
        self.fc = nn.Linear(30*30*16, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.conv2d(x)
        x = x.reshape(batch_size, -1)
        x = self.fc(x)
        out = self.softmax(x)
        return out

# Instantiate models, datasets, loaders, and optimizers
model = Model(config.num_classes)
dataset = RandomDataset(config.dataset_size)
loader = DataLoader(dataset, batch_size=config.batchSize, shuffle=True)
loss_func = nn.CrossEntropyLoss()

if torch.cuda.is_available():
    model.cuda()
optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9)

# If using DP, only one line
# if torch.cuda.device_count > 1: model = nn.DataParallel(model)
# Instead of DP, we will use DDP

# Start training
for epoch in range(config.epochs):
    for step, (images, labels) in enumerate(loader):
        if torch.cuda.is_available(): 
            images = images.cuda()
            labels = labels.cuda()
        preds = model(images)
        loss = loss_func(preds, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f'Step: {step}, Loss: {loss.item()}')

    print(f'Epoch {epoch} Finished !')
       

The training log output is:


Step: 0, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
...
Step: 7, Loss: 1.4611507654190063
Epoch 0 Finished !
...


Modify code


Using DistributedDataParallel in PyTorch that turning a single-node, single-GPU training into a single-node, multi-GPU parallel training requires the following steps:


1. Initialization


torch.distributed.init_process_group(backend="nccl")
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)   
 

2. Set up model parallelism


model=torch.nn.parallel.DistributedDataParallel(model)

3. Set up data parallelism


from torch.utils.data.distributed import DistributedSampler
# This sampler will automatically allocate data to each gpu
sampler = DistributedSampler(dataset) 
loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)

Initiation of DDP multi-GPU training


It should also be noted that the start script of a single-node multi-GPU is also different than usual, which is:


python -m torch.distributed.launch --nproc_per_node 2  ddp_demo.py --batchSize 64 --epochs 10 --gpu_id 1,2
# or torchrun  --nproc_per_node=2 ddp_demo.py --batchSize 64 --epochs 10

where --nproc_per_node is the number of graphics cards we want to use. argparse is added after the arguments.


Note that --local_rank is not manually specified by us.


Output of DDP multi-GPU training


Also, since DDP training is done in multi-process, each process prints over the log output. That is, output like this will appear:


Step: 0, Loss: 1.4611507654190063
Step: 0, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
...
Step: 7, Loss: 1.4611507654190063
Step: 7, Loss: 1.4611507654190063
Epoch 0 Finished !
Epoch 0 Finished !
...

To avoid mess up the output, there are some tips here:


After the launcher starts the python script, during execution, the launcher will pass the index of the current process to python as an argument, and we can get the index of the current process this way: i.e., the command line argument --local_rank tells us which GPU the current process is using, and is used to specify a different device in each process (there are also other ways to get the current process). A process can be simply understood as running a code. Distributed training uses a multi-GPU multi-process approach, i.e. each process has to run a separate copy of the training code, thus assigning a process to each GPU for distributed training. Usually, it is not necessary to have logs or other information (model weights, etc.) output in each process, i.e. the process that prints logs or other information (model weights, etc.) can be specified by --local_rank.


View current device


We can add a line to the training loop to see on which GPU the current process is being computed:


print(f"data: {images.device}, model: {next(model.parameters()).device}")

Note that here our model.parameters() is actually a generator in Python, so you need to use the next() method to get one of them and see where it's located.


Partial output.:


Epoch 0 Finished !
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
...
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
Epoch 1 Finished !
...

It can be seen that in our DDP training, the training data and the distribution of the model are available on both devices and are printed out as well.


Appendix: Complete DDP Training Code


# ddp_demo.py
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
import os
import argparse

# Define dataset sample
class RandomDataset(Dataset):
    def __init__(self, dataset_size, image_size=32):
        images = torch.randn(dataset_size, 3, image_size, image_size)
        labels = torch.zeros(dataset_size, dtype=int)
        self.data = list(zip(images, labels))

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return len(self.data)

# Define the model
class Model(nn.Module):
    def __init__(self, num_classes):
        super(Model, self).__init__()
        self.conv2d = nn.Conv2d(3, 16, 3)
        self.fc = nn.Linear(30*30*16, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        batch_size = x.shape[0]
        x = self.conv2d(x)
        x = x.reshape(batch_size, -1)
        x = self.fc(x)
        out = self.softmax(x)
        return out


parser = argparse.ArgumentParser()
parser.add_argument('--gpu_id', type=str, default='0,2')
parser.add_argument('--batchSize', type=int, default=64)
parser.add_argument('--epochs', type=int, default=5)
parser.add_argument('--dataset-size', type=int, default=1024)
parser.add_argument('--num-classes', type=int, default=10)
config = parser.parse_args()

os.environ['CUDA_VISIBLE_DEVICES'] = config.gpu_id
torch.distributed.init_process_group(backend="nccl")

local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

# Instantiate models, datasets, loaders, and optimizers
model = Model(config.num_classes)

dataset = RandomDataset(config.dataset_size)
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, batch_size=config.batchSize, sampler=sampler)

# loader = DataLoader(dataset, batch_size=config.batchSize, shuffle=True)
loss_func = nn.CrossEntropyLoss()

if torch.cuda.is_available():
    model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model)
optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9)


# Start training
for epoch in range(config.epochs):
    for step, (images, labels) in enumerate(loader):
        if torch.cuda.is_available(): 
            images = images.cuda()
            labels = labels.cuda()
        preds = model(images)
        # print(f"data: {images.device}, model: next(model.parameters()).device}")
        loss = loss_func(preds, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f'Step: {step}, Loss: {loss.item()}')

    print(f'Epoch {epoch} Finished !')

Start-up training (for two cards) :


torchrun  --nproc_per_node=2 ddp_demo.py --batchSize 64 --epochs 10

About Cloudam


Cloudam is a one-stop cloud-HPC platform with 300+ pre-installed to deploy immediately. The system can smartly schedule compute nodes and dynamically schedule the software licenses, optimizing workflow and boosting efficiency for engineers and researchers.


Partnered with AWS, Azure, Google Cloud, Oracle Cloud, etc., Cloudam powers your R&D with massive cloud resources without queuing.


You can submit jobs with intuitive templates, SLURM, and Windows/Linux workstations. Whether you are a beginner or a professional, you can always find it handy to run and manage your jobs.


There is a $30 Free Trial for every new user. Why not register and boost your R&D NOW?




bottom of page