jamelkenya.com

Mastering Distributed Data Parallel (DDP) for Efficient Model Training

Written on

Introduction to Distributed Data Parallel

Hello, everyone! I'm Francois, a Research Scientist at Meta. I'm excited to guide you through this tutorial, part of the Awesome AI Tutorials series. Today, we’ll clarify the concept of Distributed Data Parallel (DDP), a technique that allows the simultaneous training of models across multiple GPUs.

Reflecting on my time in engineering school, I fondly remember using Google Colab’s GPUs for training. However, in a corporate setting, the scenario shifts significantly. If you belong to a tech organization deeply engaged in AI, especially in a major tech company, you likely have access to numerous GPU clusters.

This session aims to empower you to utilize multiple GPUs for rapid and efficient model training, and the good news is, it’s more straightforward than you might think! Before we dive in, ensure you have a solid understanding of PyTorch and its essential components, such as Datasets, DataLoaders, Optimizers, CUDA, and the training loop.

Initially, I perceived DDP as a daunting tool, believing that a large team was necessary to establish the required infrastructure. However, I can assure you that DDP is not only user-friendly but also concise, requiring only a few lines of code for implementation. Let’s embark on this enlightening journey!

Understanding DDP at a Glance

Distributed Data Parallel (DDP) is a concept that becomes clear when simplified. Imagine having a cluster with four GPUs. With DDP, the same model, including its optimizer, is loaded onto each GPU. The main difference lies in how we distribute the data.

If you're familiar with deep learning, you’ll recognize the DataLoader, which partitions your dataset into separate batches. Typically, the dataset is divided into these batches, and the model is updated after processing each one.

With DDP, we enhance this process by breaking each batch into "sub-batches." Each model replica processes part of the main batch, resulting in unique gradient calculations for each GPU.

We utilize a DistributedSampler to split the batch into sub-batches, as depicted in the following image:

Diagram illustrating DDP data distribution

After distributing each sub-batch to individual GPUs, every GPU computes its own gradient. Here’s where the magic of DDP happens. Before updating the model parameters, the gradients from each GPU must be combined so that every GPU reflects the average gradient calculated over the entire data batch. This averaging is accomplished by summing the gradients from all GPUs and dividing by the total number of GPUs.

For example, if you have four GPUs, the average gradient for a specific model parameter is the sum of that parameter's gradients from each of the four GPUs divided by four. DDP employs the NCCL or Gloo backend (with NCCL optimized for NVIDIA GPUs and Gloo being more general) to facilitate efficient communication and gradient averaging among GPUs.

Terminology: Nodes and Ranks

Before we dive into coding, let’s clarify some essential terms:

  • Node: A powerful machine equipped with multiple GPUs. When we refer to a cluster, it’s not merely a collection of GPUs; they are organized into groups or "nodes." For instance, one node could contain eight GPUs.
  • Master Node: In a multi-node setup, one node takes the lead. This "master node" manages tasks such as synchronization, initiating model copies, loading the model, and overseeing log entries. Without it, each GPU would create logs independently, resulting in disarray.
  • Local Rank: This term refers to a GPU's position or ID within its specific node. It's termed "local" because it's limited to that machine.
  • Global Rank: This broader term identifies a GPU across all nodes, providing a unique identifier regardless of the machine.
  • World Size: This represents the total number of GPUs available across all nodes, calculated as the product of the number of nodes and the GPUs per node.

When working with a single machine, the local and global ranks are simpler, as they are equivalent.

Illustration of local vs global rank

Exploring DDP Limitations

While Distributed Data Parallel (DDP) has revolutionized numerous deep learning workflows, it's crucial to recognize its limitations. The primary constraint of DDP stems from its memory requirements. Each GPU loads a copy of the model, optimizer, and its respective data batch. GPU memory capacities generally range from a few GBs to 80GB for high-end GPUs.

For smaller models, memory usage isn’t a concern, but when dealing with Large Language Models (LLMs) or architectures similar to GPT, the limitations of a single GPU's memory can become evident.

In Computer Vision, although many lightweight models exist, challenges can arise when increasing batch sizes, especially in scenarios like 3D imaging or Object Detection tasks.

This is where Fully Sharded Data Parallel (FSDP) comes into play. FSDP not only distributes data but also disperses the model and optimizer states across GPU memories. However, while this is advantageous, it increases inter-GPU communication, which can slow down training.

In Summary:

If your model and batch fit comfortably within a GPU's memory, DDP is the optimal choice due to its speed. For larger models requiring more memory, FSDP is a better fit, but be mindful of the trade-off: speed may be compromised for memory efficiency.

Why Choose DDP Over DP?

On PyTorch's website, you'll find options for both DP and DDP. However, I recommend solely using DDP; it’s faster and not restricted to a single node.

Comparison from PyTorch tutorial

Code Walkthrough

Implementing distributed deep learning is more straightforward than you may think. The beauty lies in the fact that you won’t be overwhelmed with manual GPU configurations or the complexities of gradient distribution. You can find all the templates and scripts here.

Here’s a breakdown of the steps we’ll follow:

  1. Process Initialization: Designate the master node, specify the port, and set up the world size.
  2. Distributed DataLoader Setup: Crucial for partitioning each batch across the available GPUs, ensuring data is evenly distributed without overlap.
  3. Model Training/Testing: This step remains largely unchanged from single GPU training.

Training on One GPU (Baseline)

Let’s start with a basic code snippet that loads a dataset, creates a model, and trains it on a single GPU.

import torch

import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

from sklearn.datasets import load_wine

from sklearn.model_selection import train_test_split

from sklearn.preprocessing import StandardScaler

import numpy as np

class WineDataset(Dataset):

def __init__(self, data, targets):

self.data = data

self.targets = targets

def __len__(self):

return len(self.data)

def __getitem__(self, idx):

return torch.tensor(self.data[idx], dtype=torch.float), torch.tensor(self.targets[idx], dtype=torch.long)

class SimpleNN(torch.nn.Module):

def __init__(self):

super(SimpleNN, self).__init__()

self.fc1 = torch.nn.Linear(13, 64)

self.fc2 = torch.nn.Linear(64, 3)

def forward(self, x):

x = F.relu(self.fc1(x))

x = self.fc2(x)

return x

class Trainer():

def __init__(self, model, train_data, optimizer, gpu_id, save_every):

self.model = model

self.train_data = train_data

self.optimizer = optimizer

self.gpu_id = gpu_id

self.save_every = save_every

self.losses = []

def _run_batch(self, source, targets):

self.optimizer.zero_grad()

output = self.model(source)

loss = F.cross_entropy(output, targets)

loss.backward()

self.optimizer.step()

return loss.item()

def _run_epoch(self, epoch):

total_loss = 0.0

num_batches = len(self.train_data)

for source, targets in self.train_data:

source = source.to(self.gpu_id)

targets = targets.to(self.gpu_id)

loss = self._run_batch(source, targets)

total_loss += loss

avg_loss = total_loss / num_batches

self.losses.append(avg_loss)

print(f"Epoch {epoch}, Loss: {avg_loss:.4f}")

def _save_checkpoint(self, epoch):

checkpoint = self.model.state_dict()

PATH = f"model_{epoch}.pt"

torch.save(checkpoint, PATH)

print(f"Epoch {epoch} | Model saved to {PATH}")

def train(self, max_epochs):

self.model.train()

for epoch in range(max_epochs):

self._run_epoch(epoch)

if epoch % self.save_every == 0:

self._save_checkpoint(epoch)

def load_train_objs():

wine_data = load_wine()

X = wine_data.data

y = wine_data.target

# Normalize and split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler().fit(X_train)

X_train = scaler.transform(X_train)

X_test = scaler.transform(X_test)

train_set = WineDataset(X_train, y_train)

test_set = WineDataset(X_test, y_test)

print("Sample from dataset:")

sample_data, sample_target = train_set[0]

print(f"Data: {sample_data}")

print(f"Target: {sample_target}")

model = SimpleNN()

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

return train_set, model, optimizer

def prepare_dataloader(dataset, batch_size):

return DataLoader(dataset, batch_size=batch_size, pin_memory=True, shuffle=True)

def main(device, total_epochs, save_every, batch_size):

dataset, model, optimizer = load_train_objs()

train_data = prepare_dataloader(dataset, batch_size)

trainer = Trainer(model, train_data, optimizer, device, save_every)

trainer.train(total_epochs)

main(device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), total_epochs=100, save_every=50, batch_size=32)

Training on Multiple GPUs, One Node

Now, let’s utilize all GPUs in a single node by following these steps:

  1. Import Necessary Libraries for Distributed Training.
  2. Initialize the Distributed Environment: (especially setting the MASTER_ADDR and MASTER_PORT).
  3. Wrap the Model with DDP using the DistributedDataParallel wrapper.
  4. Use Distributed Sampler to ensure that the dataset is divided across GPUs.
  5. Adjust the Main Function to spawn multiple processes for multi-GPU training.

For the necessary libraries, we’ll need the following:

import torch.multiprocessing as mp

from torch.utils.data.distributed import DistributedSampler

from torch.nn.parallel import DistributedDataParallel as DDP

from torch.distributed import init_process_group, destroy_process_group

import os

Next, we set up each process. For example, if we have eight GPUs on one node, we’ll call the following function eight times, once for each GPU with the correct local_rank:

def ddp_setup(rank, world_size):

os.environ["MASTER_ADDR"] = "localhost"

os.environ["MASTER_PORT"] = "12355"

init_process_group(backend="nccl", rank=rank, world_size=world_size)

torch.cuda.set_device(rank)

A few explanations on the function:

  • MASTER_ADDR: The hostname of the machine where the master (or rank 0 process) is running. Here, it’s set to localhost.
  • MASTER_PORT: Specifies the port on which the master listens for connections from workers. You can choose any unused port number.
  • torch.cuda.set_device(rank): Ensures that each process uses its corresponding GPU.

Next, we slightly modify the Trainer class to wrap the model with the DDP function:

class Trainer():

def __init__(self, model, train_data, optimizer, gpu_id, save_every):

self.model = model.to(gpu_id)

self.train_data = train_data

self.optimizer = optimizer

self.gpu_id = gpu_id

self.save_every = save_every

self.losses = []

self.model = DDP(self.model, device_ids=[gpu_id])

We also need to adjust the DataLoader, as we must split the batch across each GPU:

def prepare_dataloader(dataset: Dataset, batch_size: int):

return DataLoader(

dataset,

batch_size=batch_size,

pin_memory=True,

shuffle=False,

sampler=DistributedSampler(dataset)

)

Now, let’s modify the main function that will be called for each process (so eight times in our case):

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):

ddp_setup(rank, world_size)

dataset, model, optimizer = load_train_objs()

train_data = prepare_dataloader(dataset, batch_size)

trainer = Trainer(model, train_data, optimizer, rank, save_every)

trainer.train(total_epochs)

destroy_process_group()

Finally, when executing the script, we’ll launch the eight processes using the mp.spawn() function:

if __name__ == "__main__":

import argparse

parser = argparse.ArgumentParser(description='simple distributed training job')

parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')

parser.add_argument('save_every', type=int, help='How often to save a snapshot')

parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')

args = parser.parse_args()

world_size = torch.cuda.device_count()

mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

Ultimate Step: Training Across Multiple Nodes

Congratulations on making it this far! The ultimate step involves utilizing all available GPUs across different nodes. Understanding what we’ve done thus far makes this process relatively simple.

The key distinction when scaling across multiple nodes is transitioning from local_rank to global_rank. This is critical because each process requires a unique identifier. For example, if you’re using two nodes with eight GPUs each, both processes 0 and 8 will have a local_rank of 0.

The formula for determining global_rank is as follows:

global_rank = node_rank * world_size_per_node + local_rank

Let’s first adjust the ddp_setup function:

def ddp_setup(local_rank, world_size_per_node, node_rank):

os.environ["MASTER_ADDR"] = "MASTER_NODE_IP" # <-- Replace with your master node IP

os.environ["MASTER_PORT"] = "12355"

global_rank = node_rank * world_size_per_node + local_rank

init_process_group(backend="nccl", rank=global_rank, world_size=world_size_per_node * torch.cuda.device_count())

torch.cuda.set_device(local_rank)

We also need to modify the main function, which will now take world_size_per_node as an argument:

def main(local_rank: int, world_size_per_node: int, save_every: int, total_epochs: int, batch_size: int, node_rank: int):

ddp_setup(local_rank, world_size_per_node, node_rank)

# ... (rest of the main function)

Lastly, we’ll adjust the mp.spawn() function to include the world_size_per_node:

if __name__ == "__main__":

import argparse

parser = argparse.ArgumentParser(description='simple distributed training job')

parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')

parser.add_argument('save_every', type=int, help='How often to save a snapshot')

parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')

parser.add_argument('--node_rank', default=0, type=int, help='The rank of the node in multi-node training')

args = parser.parse_args()

world_size_per_node = torch.cuda.device_count()

mp.spawn(main, args=(world_size_per_node, args.save_every, args.total_epochs, args.batch_size, args.node_rank), nprocs=world_size_per_node)

Using a Cluster (SLURM)

You are now prepared to submit the training to the cluster. It’s quite simple; you just need to specify the number of nodes you wish to use. Here’s a template for the SLURM script:

#!/bin/bash

#SBATCH --job-name=DDPTraining # Name of the job

#SBATCH --nodes=$1 # Number of nodes specified by the user

#SBATCH --ntasks-per-node=1 # Ensure only one task runs per node

#SBATCH --cpus-per-task=1 # Number of CPU cores per task

#SBATCH --gres=gpu:1 # Number of GPUs per node

#SBATCH --time=01:00:00 # Time limit hrs:min:sec (1 hour in this example)

#SBATCH --mem=4GB # Memory limit per GPU

#SBATCH --output=training_%j.log # Output and error log name (%j expands to jobId)

#SBATCH --partition=gpu # Specify the partition or queue

srun python3 your_python_script.py --total_epochs 10 --save_every 2 --batch_size 32 --node_rank $SLURM_NODEID

You can now start training from the terminal with the following command:

sbatch train_net.sh 2 # for using 2 nodes

Congratulations, you’ve successfully completed the tutorial!

Before You Go:

For more fantastic tutorials, check out my compilation of AI tutorials on GitHub.

If you'd like to receive my articles directly to your inbox, subscribe here.

For access to premium articles on Medium, a $5 monthly membership is all you need. By signing up through my link, you’ll support my work without any extra cost to you.

If you found this article helpful, please consider following me and giving a clap for more in-depth content! Your support allows me to continue creating resources that enhance our collective understanding.

References

  • PyTorch guide on DDP
  • Tutorial Series

The first video provides an introduction to the Distributed Data Parallel (DDP) tutorial series, aimed at helping you understand the basics and benefits of DDP in model training.

The second video focuses on PyTorch Distributed Data Parallel (DDP) as presented during the PyTorch Developer Day 2020, offering insights and practical tips for implementation.

Share the page:

Twitter Facebook Reddit LinkIn

-----------------------

Recent Post:

Maximizing Your Chances: Strategies for Winning in Math Games

Discover techniques to enhance your winning odds in math games through strategic number selection and probability understanding.

Unlocking Your Morning Potential: A Mobility Guide for Runners

Discover how a simple 5-minute mobility routine can enhance your morning workouts and overall health.

The Power of Multiple Dispatch in Julia Programming

Discover the advantages of multiple dispatch in Julia and why it stands out as a programming paradigm.

Exploring New Horizons in My Writing Journey

After a brief hiatus from Medium, I reflect on my writing journey and the exciting new projects ahead.

# Exciting New Leaks for Grand Theft Auto 6 Surface Online

Recent leaks about Grand Theft Auto 6 have emerged, showcasing gameplay footage and hints about the game's features.

Achieving Big Results Takes Time and Dedication

Discover the roadmap to achieving significant results in content creation, emphasizing patience and dedication.

The Harmonious Intersection of Bach, Mathematics, and Music

Exploring the profound connections between Bach's music and mathematical concepts, highlighting patterns and fractal structures.

Empowering Progress: Choose Three Daily Tasks for Growth

Discover how selecting three daily tasks can help you make meaningful progress in your life and achieve your goals.