Mastering Distributed Data Parallel (DDP) for Efficient Model Training
Written on
Introduction to Distributed Data Parallel
Hello, everyone! I'm Francois, a Research Scientist at Meta. I'm excited to guide you through this tutorial, part of the Awesome AI Tutorials series. Today, we’ll clarify the concept of Distributed Data Parallel (DDP), a technique that allows the simultaneous training of models across multiple GPUs.
Reflecting on my time in engineering school, I fondly remember using Google Colab’s GPUs for training. However, in a corporate setting, the scenario shifts significantly. If you belong to a tech organization deeply engaged in AI, especially in a major tech company, you likely have access to numerous GPU clusters.
This session aims to empower you to utilize multiple GPUs for rapid and efficient model training, and the good news is, it’s more straightforward than you might think! Before we dive in, ensure you have a solid understanding of PyTorch and its essential components, such as Datasets, DataLoaders, Optimizers, CUDA, and the training loop.
Initially, I perceived DDP as a daunting tool, believing that a large team was necessary to establish the required infrastructure. However, I can assure you that DDP is not only user-friendly but also concise, requiring only a few lines of code for implementation. Let’s embark on this enlightening journey!
Understanding DDP at a Glance
Distributed Data Parallel (DDP) is a concept that becomes clear when simplified. Imagine having a cluster with four GPUs. With DDP, the same model, including its optimizer, is loaded onto each GPU. The main difference lies in how we distribute the data.
If you're familiar with deep learning, you’ll recognize the DataLoader, which partitions your dataset into separate batches. Typically, the dataset is divided into these batches, and the model is updated after processing each one.
With DDP, we enhance this process by breaking each batch into "sub-batches." Each model replica processes part of the main batch, resulting in unique gradient calculations for each GPU.
We utilize a DistributedSampler to split the batch into sub-batches, as depicted in the following image:
After distributing each sub-batch to individual GPUs, every GPU computes its own gradient. Here’s where the magic of DDP happens. Before updating the model parameters, the gradients from each GPU must be combined so that every GPU reflects the average gradient calculated over the entire data batch. This averaging is accomplished by summing the gradients from all GPUs and dividing by the total number of GPUs.
For example, if you have four GPUs, the average gradient for a specific model parameter is the sum of that parameter's gradients from each of the four GPUs divided by four. DDP employs the NCCL or Gloo backend (with NCCL optimized for NVIDIA GPUs and Gloo being more general) to facilitate efficient communication and gradient averaging among GPUs.
Terminology: Nodes and Ranks
Before we dive into coding, let’s clarify some essential terms:
- Node: A powerful machine equipped with multiple GPUs. When we refer to a cluster, it’s not merely a collection of GPUs; they are organized into groups or "nodes." For instance, one node could contain eight GPUs.
- Master Node: In a multi-node setup, one node takes the lead. This "master node" manages tasks such as synchronization, initiating model copies, loading the model, and overseeing log entries. Without it, each GPU would create logs independently, resulting in disarray.
- Local Rank: This term refers to a GPU's position or ID within its specific node. It's termed "local" because it's limited to that machine.
- Global Rank: This broader term identifies a GPU across all nodes, providing a unique identifier regardless of the machine.
- World Size: This represents the total number of GPUs available across all nodes, calculated as the product of the number of nodes and the GPUs per node.
When working with a single machine, the local and global ranks are simpler, as they are equivalent.
Exploring DDP Limitations
While Distributed Data Parallel (DDP) has revolutionized numerous deep learning workflows, it's crucial to recognize its limitations. The primary constraint of DDP stems from its memory requirements. Each GPU loads a copy of the model, optimizer, and its respective data batch. GPU memory capacities generally range from a few GBs to 80GB for high-end GPUs.
For smaller models, memory usage isn’t a concern, but when dealing with Large Language Models (LLMs) or architectures similar to GPT, the limitations of a single GPU's memory can become evident.
In Computer Vision, although many lightweight models exist, challenges can arise when increasing batch sizes, especially in scenarios like 3D imaging or Object Detection tasks.
This is where Fully Sharded Data Parallel (FSDP) comes into play. FSDP not only distributes data but also disperses the model and optimizer states across GPU memories. However, while this is advantageous, it increases inter-GPU communication, which can slow down training.
In Summary:
If your model and batch fit comfortably within a GPU's memory, DDP is the optimal choice due to its speed. For larger models requiring more memory, FSDP is a better fit, but be mindful of the trade-off: speed may be compromised for memory efficiency.
Why Choose DDP Over DP?
On PyTorch's website, you'll find options for both DP and DDP. However, I recommend solely using DDP; it’s faster and not restricted to a single node.
Comparison from PyTorch tutorial
Code Walkthrough
Implementing distributed deep learning is more straightforward than you may think. The beauty lies in the fact that you won’t be overwhelmed with manual GPU configurations or the complexities of gradient distribution. You can find all the templates and scripts here.
Here’s a breakdown of the steps we’ll follow:
- Process Initialization: Designate the master node, specify the port, and set up the world size.
- Distributed DataLoader Setup: Crucial for partitioning each batch across the available GPUs, ensuring data is evenly distributed without overlap.
- Model Training/Testing: This step remains largely unchanged from single GPU training.
Training on One GPU (Baseline)
Let’s start with a basic code snippet that loads a dataset, creates a model, and trains it on a single GPU.
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
class WineDataset(Dataset):
def __init__(self, data, targets):
self.data = data
self.targets = targets
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)
class SimpleNN(torch.nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = torch.nn.Linear(13, 64)
self.fc2 = torch.nn.Linear(64, 3)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
class Trainer():
def __init__(self, model, train_data, optimizer, gpu_id, save_every):
self.model = model
self.train_data = train_data
self.optimizer = optimizer
self.gpu_id = gpu_id
self.save_every = save_every
self.losses = []
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
return loss.item()
def _run_epoch(self, epoch):
total_loss = 0.0
num_batches = len(self.train_data)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
loss = self._run_batch(source, targets)
total_loss += loss
avg_loss = total_loss / num_batches
self.losses.append(avg_loss)
print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")
def _save_checkpoint(self, epoch):
checkpoint = self.model.state_dict()
PATH = f"model_{epoch}.pt"
torch.save(checkpoint, PATH)
print(f"Epoch {epoch} | Model saved to {PATH}")
def train(self, max_epochs):
self.model.train()
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)
def load_train_objs():
wine_data = load_wine()
X = wine_data.data
y = wine_data.target
# Normalize and split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
train_set = WineDataset(X_train, y_train)
test_set = WineDataset(X_test, y_test)
print("Sample from dataset:")
sample_data, sample_target = train_set[0]
print(f"Data: {sample_data}")
print(f"Target: {sample_target}")
model = SimpleNN()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
return train_set, model, optimizer
def prepare_dataloader(dataset, batch_size):
return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)
def main(device, total_epochs, save_every, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device, save_every)
trainer.train(total_epochs)
main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)
Training on Multiple GPUs, One Node
Now, let’s utilize all GPUs in a single node by following these steps:
- Import Necessary Libraries for Distributed Training.
- Initialize the Distributed Environment: (especially setting the MASTER_ADDR and MASTER_PORT).
- Wrap the Model with DDP using the DistributedDataParallel wrapper.
- Use Distributed Sampler to ensure that the dataset is divided across GPUs.
- Adjust the Main Function to spawn multiple processes for multi-GPU training.
For the necessary libraries, we’ll need the following:
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
Next, we set up each process. For example, if we have eight GPUs on one node, we’ll call the following function eight times, once for each GPU with the correct local_rank:
def ddp_setup(rank, world_size):
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
A few explanations on the function:
- MASTER_ADDR: The hostname of the machine where the master (or rank 0 process) is running. Here, it’s set to localhost.
- MASTER_PORT: Specifies the port on which the master listens for connections from workers. You can choose any unused port number.
- torch.cuda.set_device(rank): Ensures that each process uses its corresponding GPU.
Next, we slightly modify the Trainer class to wrap the model with the DDP function:
class Trainer():
def __init__(self, model, train_data, optimizer, gpu_id, save_every):
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.gpu_id = gpu_id
self.save_every = save_every
self.losses = []
self.model = DDP(self.model, device_ids=[gpu_id])
We also need to adjust the DataLoader, as we must split the batch across each GPU:
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
Now, let’s modify the main function that will be called for each process (so eight times in our case):
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, rank, save_every)
trainer.train(total_epochs)
destroy_process_group()
Finally, when executing the script, we’ll launch the eight processes using the mp.spawn() function:
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
Ultimate Step: Training Across Multiple Nodes
Congratulations on making it this far! The ultimate step involves utilizing all available GPUs across different nodes. Understanding what we’ve done thus far makes this process relatively simple.
The key distinction when scaling across multiple nodes is transitioning from local_rank to global_rank. This is critical because each process requires a unique identifier. For example, if you’re using two nodes with eight GPUs each, both processes 0 and 8 will have a local_rank of 0.
The formula for determining global_rank is as follows:
global_rank = node_rank * world_size_per_node + local_rank
Let’s first adjust the ddp_setup function:
def ddp_setup(local_rank, world_size_per_node, node_rank):
os.environ["MASTER_ADDR"] = "MASTER_NODE_IP" # <-- Replace with your master node IP
os.environ["MASTER_PORT"] = "12355"
global_rank = node_rank * world_size_per_node + local_rank
init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node * torch.cuda.device_count())
torch.cuda.set_device(local_rank)
We also need to modify the main function, which will now take world_size_per_node as an argument:
def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):
ddp_setup(local_rank, world_size_per_node, node_rank)
# ... (rest of the main function)
Lastly, we’ll adjust the mp.spawn() function to include the world_size_per_node:
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
parser.add_argument('--node_rank', default=0, type=int, help='The rank of the node in multi-node training')
args = parser.parse_args()
world_size_per_node = torch.cuda.device_count()
mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)
Using a Cluster (SLURM)
You are now prepared to submit the training to the cluster. It’s quite simple; you just need to specify the number of nodes you wish to use. Here’s a template for the SLURM script:
#!/bin/bash
#SBATCH --job-name=DDPTraining # Name of the job
#SBATCH --nodes=$1 # Number of nodes specified by the user
#SBATCH --ntasks-per-node=1 # Ensure only one task runs per node
#SBATCH --cpus-per-task=1 # Number of CPU cores per task
#SBATCH --gres=gpu:1 # Number of GPUs per node
#SBATCH --time=01:00:00 # Time limit hrs:min:sec (1 hour in this example)
#SBATCH --mem=4GB # Memory limit per GPU
#SBATCH --output=training_%j.log # Output and error log name (%j expands to jobId)
#SBATCH --partition=gpu # Specify the partition or queue
srun python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID
You can now start training from the terminal with the following command:
sbatch train_net.sh 2 # for using 2 nodes
Congratulations, you’ve successfully completed the tutorial!
Before You Go:
For more fantastic tutorials, check out my compilation of AI tutorials on GitHub.
If you'd like to receive my articles directly to your inbox, subscribe here.
For access to premium articles on Medium, a $5 monthly membership is all you need. By signing up through my link, you’ll support my work without any extra cost to you.
If you found this article helpful, please consider following me and giving a clap for more in-depth content! Your support allows me to continue creating resources that enhance our collective understanding.
References
- PyTorch guide on DDP
- Tutorial Series
The first video provides an introduction to the Distributed Data Parallel (DDP) tutorial series, aimed at helping you understand the basics and benefits of DDP in model training.
The second video focuses on PyTorch Distributed Data Parallel (DDP) as presented during the PyTorch Developer Day 2020, offering insights and practical tips for implementation.