A quick tutorial on distributed data parallel training on PyTorch with multiple GPUs to let beginners start training in just a few minutes.

DP vs. DDP
We know that PyTorch itself provides two implementations for multi-GPU training.
DataParallel (DP): Parameter Server mode, one card bit reducer, and super simple to implement, one line of code.
DistributedDataParallel (DDP): All-Reduce mode, intended for distributed training, but can also be used for training on a single node with multi cards.
DataParallel is an algorithm based on Parameter Server algorithm, which is relatively simple to implement by adding one line to the original standalone single card code:
model = nn.DataParallel(model, device_ids=config.gpu_id)
But its load imbalance problem is more serious, sometimes when the model is larger (such as bert-large), the reducer's card will have an extra 3-4G of GPU memory occupied.
And the speed is also slower:

DistributedDataParallel
The official recommendation is to use the new DDP with all-reduce algorithm, which was designed mainly for multi-node multi-GPU training, but it also works on a single-node multi-GPU training.
First, a few concepts need to be clarified.
rank
Multi-node/multi-GPU: represents a particular node
Single-node/multi-GPU Mode: represents a particular GPU
world_size
Multi-node/multi-GPU: represents how many nodes
Single-node/multi-GPU: represents how many GPUs there are
local_rank
Multi-node/multi-GPU: the number of a GPU
Single-node/multi-GPU: the number of a GPU
Single-node single-GPU training
Let's start by giving a demo of a single node with a single card training code and simply run the data stream. The demo is small but complete. This demo contains the complete steps of our usual deep learning training process. It includes the definition and instantiation of the model and dataset, the loss function, the definition of the optimizer, gradient clearing, gradient backpropagation, optimizer iteration update, and the printing of the training log.
Next, we will use the DistributedDataParallel provided by PyTorch to convert this single-computer, single-card training process to single-node, multi-GPU parallel training.
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--gpu_id', type=str, default='0,2')
parser.add_argument('--batchSize', type=int, default=32)
parser.add_argument('--epochs', type=int, default=5)
parser.add_argument('--dataset-size', type=int, default=128)
parser.add_argument('--num-classes', type=int, default=10)
config = parser.parse_args()
os.environ['CUDA_VISIBLE_DEVICES'] = config.gpu_id
# Define dataset sample
class RandomDataset(Dataset):
def __init__(self, dataset_size, image_size=32):
images = torch.randn(dataset_size, 3, image_size, image_size)
labels = torch.zeros(dataset_size, dtype=int)
self.data = list(zip(images, labels))
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
# Define the model
class Model(nn.Module):
def __init__(self, num_classes):
super(Model, self).__init__()
self.conv2d = nn.Conv2d(3, 16, 3)
self.fc = nn.Linear(30*30*16, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
batch_size = x.shape[0]
x = self.conv2d(x)
x = x.reshape(batch_size, -1)
x = self.fc(x)
out = self.softmax(x)
return out
# Instantiate models, datasets, loaders, and optimizers
model = Model(config.num_classes)
dataset = RandomDataset(config.dataset_size)
loader = DataLoader(dataset, batch_size=config.batchSize, shuffle=True)
loss_func = nn.CrossEntropyLoss()
if torch.cuda.is_available():
model.cuda()
optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9)
# If using DP, only one line
# if torch.cuda.device_count > 1: model = nn.DataParallel(model)
# Instead of DP, we will use DDP
# Start training
for epoch in range(config.epochs):
for step, (images, labels) in enumerate(loader):
if torch.cuda.is_available():
images = images.cuda()
labels = labels.cuda()
preds = model(images)
loss = loss_func(preds, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Step: {step}, Loss: {loss.item()}')
print(f'Epoch {epoch} Finished !')
The training log output is:
Step: 0, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
...
Step: 7, Loss: 1.4611507654190063
Epoch 0 Finished !
...
Modify code
Using DistributedDataParallel in PyTorch that turning a single-node, single-GPU training into a single-node, multi-GPU parallel training requires the following steps:
1. Initialization
torch.distributed.init_process_group(backend="nccl")
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
2. Set up model parallelism
model=torch.nn.parallel.DistributedDataParallel(model)
3. Set up data parallelism
from torch.utils.data.distributed import DistributedSampler
# This sampler will automatically allocate data to each gpu
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)
Initiation of DDP multi-GPU training
It should also be noted that the start script of a single-node multi-GPU is also different than usual, which is:
python -m torch.distributed.launch --nproc_per_node 2 ddp_demo.py --batchSize 64 --epochs 10 --gpu_id 1,2
# or torchrun --nproc_per_node=2 ddp_demo.py --batchSize 64 --epochs 10
where --nproc_per_node is the number of graphics cards we want to use. argparse is added after the arguments.
Note that --local_rank is not manually specified by us.
Output of DDP multi-GPU training
Also, since DDP training is done in multi-process, each process prints over the log output. That is, output like this will appear:
Step: 0, Loss: 1.4611507654190063
Step: 0, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
Step: 1, Loss: 1.4611507654190063
...
Step: 7, Loss: 1.4611507654190063
Step: 7, Loss: 1.4611507654190063
Epoch 0 Finished !
Epoch 0 Finished !
...
To avoid mess up the output, there are some tips here:
After the launcher starts the python script, during execution, the launcher will pass the index of the current process to python as an argument, and we can get the index of the current process this way: i.e., the command line argument --local_rank tells us which GPU the current process is using, and is used to specify a different device in each process (there are also other ways to get the current process). A process can be simply understood as running a code. Distributed training uses a multi-GPU multi-process approach, i.e. each process has to run a separate copy of the training code, thus assigning a process to each GPU for distributed training. Usually, it is not necessary to have logs or other information (model weights, etc.) output in each process, i.e. the process that prints logs or other information (model weights, etc.) can be specified by --local_rank.
View current device
We can add a line to the training loop to see on which GPU the current process is being computed:
print(f"data: {images.device}, model: {next(model.parameters()).device}")
Note that here our model.parameters() is actually a generator in Python, so you need to use the next() method to get one of them and see where it's located.
Partial output.:
Epoch 0 Finished !
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
...
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
data: cuda:0, model: cuda:0
data: cuda:1, model: cuda:1
Epoch 1 Finished !
...
It can be seen that in our DDP training, the training data and the distribution of the model are available on both devices and are printed out as well.
Appendix: Complete DDP Training Code
# ddp_demo.py
import torch
import torch.nn as nn
from torch.optim import SGD
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
import os
import argparse
# Define dataset sample
class RandomDataset(Dataset):
def __init__(self, dataset_size, image_size=32):
images = torch.randn(dataset_size, 3, image_size, image_size)
labels = torch.zeros(dataset_size, dtype=int)
self.data = list(zip(images, labels))
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return len(self.data)
# Define the model
class Model(nn.Module):
def __init__(self, num_classes):
super(Model, self).__init__()
self.conv2d = nn.Conv2d(3, 16, 3)
self.fc = nn.Linear(30*30*16, num_classes)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
batch_size = x.shape[0]
x = self.conv2d(x)
x = x.reshape(batch_size, -1)
x = self.fc(x)
out = self.softmax(x)
return out
parser = argparse.ArgumentParser()
parser.add_argument('--gpu_id', type=str, default='0,2')
parser.add_argument('--batchSize', type=int, default=64)
parser.add_argument('--epochs', type=int, default=5)
parser.add_argument('--dataset-size', type=int, default=1024)
parser.add_argument('--num-classes', type=int, default=10)
config = parser.parse_args()
os.environ['CUDA_VISIBLE_DEVICES'] = config.gpu_id
torch.distributed.init_process_group(backend="nccl")
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
# Instantiate models, datasets, loaders, and optimizers
model = Model(config.num_classes)
dataset = RandomDataset(config.dataset_size)
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, batch_size=config.batchSize, sampler=sampler)
# loader = DataLoader(dataset, batch_size=config.batchSize, shuffle=True)
loss_func = nn.CrossEntropyLoss()
if torch.cuda.is_available():
model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model)
optimizer = SGD(model.parameters(), lr=0.1, momentum=0.9)
# Start training
for epoch in range(config.epochs):
for step, (images, labels) in enumerate(loader):
if torch.cuda.is_available():
images = images.cuda()
labels = labels.cuda()
preds = model(images)
# print(f"data: {images.device}, model: next(model.parameters()).device}")
loss = loss_func(preds, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Step: {step}, Loss: {loss.item()}')
print(f'Epoch {epoch} Finished !')
Start-up training (for two cards) :
torchrun --nproc_per_node=2 ddp_demo.py --batchSize 64 --epochs 10
About Cloudam
Cloudam is a one-stop cloud-HPC platform with 300+ pre-installed to deploy immediately. The system can smartly schedule compute nodes and dynamically schedule the software licenses, optimizing workflow and boosting efficiency for engineers and researchers.
Partnered with AWS, Azure, Google Cloud, Oracle Cloud, etc., Cloudam powers your R&D with massive cloud resources without queuing.
You can submit jobs with intuitive templates, SLURM, and Windows/Linux workstations. Whether you are a beginner or a professional, you can always find it handy to run and manage your jobs.
There is a $30 Free Trial for every new user. Why not register and boost your R&D NOW?