PyTorch
PyTorch is a Python package that provides two high-level features:
- Tensor computation (like NumPy) with strong GPU acceleration
- Deep neural networks built on a tape-based autograd system
If you are porting a PyTorch program to a Compute Canada cluster, you should follow our tutorial on the subject.
Disambiguation
PyTorch has a distant connection with Torch, but for all practical purposes you can treat them as separate projects.
PyTorch developers also offer LibTorch, which allows one to implement extensions to PyTorch using C++, and to implement pure C++ machine learning applications. Models written in Python using PyTorch can be converted and used in pure C++ through TorchScript.
Installation
Latest available wheels
To see the latest version of PyTorch that we have built:
[name@server ~]$ avail_wheels "torch*"
For more information on listing wheels, see listing available wheels.
Installing Compute Canada wheel
The preferred option is to install it using the Python wheel as follows:
- 1. Load a Python module, thus module load python
- 2. Create and start a virtual environment.
- 3. Install PyTorch in the virtual environment with
pip install
.
GPU and CPU
-
(venv) [name@server ~] pip install --no-index torch
Extra
In addition to torch, you can install torchvision, torchtext and torchaudio:
(venv) [name@server ~] pip install --no-index torch torchvision torchtext torchaudio
Job submission
Here is an example of a job submission script using the python wheel, with a virtual environment inside a job:
#!/bin/bash
#SBATCH --gres=gpu:1 # Request GPU "generic resources"
#SBATCH --cpus-per-task=6 # Cores proportional to GPUs: 6 on Cedar, 16 on Graham.
#SBATCH --mem=32000M # Memory proportional to GPUs: 32000 Cedar, 64000 Graham.
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python/3.6
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch --no-index
python pytorch-test.py
The Python script pytorch-test.py
has the form
import torch
x = torch.Tensor(5, 3)
print(x)
y = torch.rand(5, 3)
print(y)
# let us run the following only if CUDA is available
if torch.cuda.is_available():
x = x.cuda()
y = y.cuda()
print(x + y)
You can then submit a PyTorch job with:
[name@server ~]$ sbatch pytorch-test.sh
High Performance with PyTorch
PyTorch with Multiple CPUs
PyTorch natively supports parallelizing work across multiple CPUs in two ways: Intra-op parallelism and Inter-op parallelism. The first refers to PyTorch's parallel implementations of operators commonly used in Deep Learning, such as matrix multiplication and convolution, using OpenMP directly or through low-level libraries like MKL and OneDNN. Whenever you run PyTorch code that performs such operations, they will automatically leverage multi-threading over as many CPU cores as are available to your job. Inter-op parallelism on the other hand refers to PyTorch's ability to execute different parts of your code concurrently. This modality of parallelism typically requires that you explicitly design your program such that different parts can run in parallel. Examples include code that leverages PyTorch's Just-In-Time compiler torch.jit
to run asynchronous tasks in a TorchScript program.
With small scale models, we strongly recommend using multiple CPUs instead of using a GPU. While training will almost certainly run faster on a GPU (except in cases where the model is very small), if your model and your dataset are not large enough, the speed up relative to CPU will likely not be very significant and your job will end up using only a small portion of the GPU's compute capabilities. This might not be an issue on your own workstation, but in a shared environment like Compute Canada's HPC clusters, this means you are unnecessarily blocking a resource that another user may need to run actual large scale computations! Furthermore, you would be unnecessarily using up your group's allocation and affecting the priority of your colleagues' jobs.
The code example below contains many opportunities for intra-op parallelism. By simply requesting more CPUs and without any code changes, we can observe the effect of PyTorch's native support for parallelism on performance:
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-cpu.py
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
PyTorch with a Single GPU
There is a common misconception that you should definitely use a GPU for model training if one is available. While this may almost always hold true (training very small models is often faster on one or more CPUs) on your own local workstation equipped with a GPU, it is not the case on Compute Canada’s HPC clusters.
Simply put, you should not ask for a GPU if your code is not capable of making a reasonable use of its compute capacity. See here for more information on how to decide whether or not you should use a GPU.
GPUs draw their performance advantage in Deep Learning tasks mainly from two sources:
- Their ability to parallelize the execution of certain key numerical operations, such as multiply-accumulate, over many thousands of compute cores compared to the single-digit count of cores available in most common CPUs.
- A much higher memory bandwidth than CPUs, which allows GPUs to efficiently use their massive number of cores to process much larger amounts of data per compute cycle.
Like in the multi-cpu case, PyTorch contains parallel implementations of operators commonly used in Deep Learning, such as matrix multiplication and convolution, using GPU-specific libraries like CUDNN or MIOpen, depending on the hardware platform. This means that for a learning task to be worth running on a GPU, it must be composed of elements that scale out with massive parallelism in terms of the number of operations that can be performed in parallel, the amount of data they require, or, ideally, both. Concretely this means, for example, large models (with large numbers of units and layers), large inputs, or, ideally, both.
In the example below, we adapt the multi-cpu code from the previous section to run on one GPU and examine its performance. We can observe that two parameters play an important role: batch_size
and num_workers
. The first influences performance by increasing the size of our inputs at each iteration, thus putting more of the GPU's capacity to use. The second influences performance by streamlining the movement of our inputs from the Host's (or the CPU's) memory to the GPU's memory, thus reducing the amount of time the GPU sits idle waiting for data to process.
Two takeaways emerge from this:
- Increase your
batch_size
to as much as you can fit in the GPU's memory to optimize your compute performance. - Use a
DataLoader
with as many workers as you havecpus-per-task
to streamline feeding data to the GPU.
Of course, batch_size
is also an important parameter with respect to a model's performance on a given task (accuracy, error, etc.) and different schools of thought have different views on the impact of using large batches. This page will not go into this subject, but if you have reason to believe that a small (relative to space in GPU memory) batch size is best for your application, skip to the next section to see how to maximize GPU utilization with small inputs.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-gpu.py --batch_size=512 --num_workers=0
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net().cuda() # Load model on the GPU
criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Data Parallelism with a single GPU
In cases where a model is fairly small, such that it does not take up a large portion of GPU memory and it cannot use a reasonable amount of its compute capacity, it is not advisable to use a GPU. Use one or more CPUs instead. However, in a scenario where you have such a model, but have a very large dataset and wish to perform training with a small batch size, taking advantage of Data Parallelism on a GPU becomes a viable option.
Data Parallelism, in this context, refers to methods to perform training over multiple replicas of a model in parallel, where each replica receives a different chunk of training data at each iteration. Gradients are then aggregated at the end of an iteration and the parameters of all replicas are updated in a synchronous or asynchronous fashion, depending on the method. Using this approach may provide a significant speed-up by iterating through all examples in a large dataset approximately N times faster, where N is the number of model replicas. An important caveat of this approach, is that in order to get a trained model that is equivalent to the same model trained without Data Parallelism, the user must scale either the learning rate or the desired batch size in function of the number of replicas. See this discussion for more information.
PyTorch has implementations of Data Parallelism methods, with the DistributedDataParallel
class being the one recommended by PyTorch maintainers for best performance. Designed to work with multiple GPUs, it can be also be used with a single GPU.
In the example that follows, we adapt the single GPU code from the previous section to use Data Parallelism. This task is fairly small - with a batch size of 512 images, our model takes up about 1GB of GPU memory space, and it uses only about 6% of its compute capacity during training. This is a model that should not be trained on a GPU on Compute Canada's clusters. However, using Data Parallelism, we can fit up to 14 or 15 replicas of this model on a V100 GPU with 16GB memory and increase our resource usage, while getting a nice speed-up. We use Nvidia's Multi-Process Service (MPS), along with MPI to efficiently place multiple model replicas on one GPU:
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# Activate Nvidia MPS:
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log
nvidia-cuda-mps-control -d
echo "starting training..."
time srun python cifar10-gpu-mps.py --batch_size=512 --num_workers=0
import os
import time
import datetime
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
def main():
print("Starting...")
args = parser.parse_args()
rank = os.environ.get("SLURM_LOCALID")
current_device = 0
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes that will run a model replica """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check.
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
PyTorch with Multiple GPUs
Ongoing issue with DistributedDataParallel
There is a known issue with our PyTorch 1.10 wheel torch-1.10.0+computecanada
. Multi-GPU code that uses DistributedDataParallel running with this PyTorch install may fail unpredictably if the backend is set to 'nccl'
or 'gloo'
. While we work on providing a stable 1.10 wheel, we recommend using earlier versions of PyTorch (1.9.x or earlier) on all GP clusters, except Narval. Earlier PyTorch versions will not work on Narval and we recommend installing the wheel from upstream with pip install torch==1.10.0+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html
for the moment. Please note that the wheel from upstream is not optimized to run on Narval. Also note that using it may cause issues with other packages in the PyTorch ecosystem such as torchvision
.
Other alternatives are to use Horovod to run Distributed Training or set the backend to 'mpi'
when using DistributedDataParallel
.
Data Parallelism with Multiple GPUs
There are several ways to use PyTorch with multiple GPUs. This section features tutorials on three of them: using the DistributedDataParallel class, using the PyTorch Lightning package and using the Horovod package.
Using DistributedDataParallel
The DistributedDataParallel class is the way recommended by PyTorch maintainers to use multiple GPUs, whether they are all on a single node, or distributed across multiple nodes. The following is a tutorial on multiple GPUs distributed across 2 nodes:
#!/bin/bash
#SBATCH --nodes 2 # Request 2 nodes so all resources are in two nodes.
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. You will get 2 per node.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision --no-index
export NCCL_BLOCKING_WAIT=1 #Set this environment variable if you wish to use the NCCL backend for inter-GPU communication.
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.
echo "r$SLURM_NODEID master: $MASTER_ADDR"
echo "r$SLURM_NODEID Launching python script"
# The SLURM_NTASKS variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times
srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $SLURM_NTASKS --batch_size 256
The Python script pytorch-ddp-test.py
has the form
import os
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='gloo', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
print("Starting...")
args = parser.parse_args()
ngpus_per_node = torch.cuda.device_count()
""" This next line is the key to getting DistributedDataParallel working on SLURM:
SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the
current process inside a node and is also 0 or 1 in this example."""
local_rank = int(os.environ.get("SLURM_LOCALID"))
rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank
current_device = local_rank
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes running on all nodes """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
#init the process group
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank)
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device])
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(epoch, net, criterion, optimizer, train_loader, rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
Using PyTorch Lightning
PyTorch Lightning is a Python package that provides interfaces to PyTorch to make many common, but otherwise code-heavy tasks, more straightforward. This includes training on multiple GPUs. The following is the same tutorial from the section above, but using PyTorch Lightning instead of explicitly leveraging the DistributedDataParallel class:
#!/bin/bash
#SBATCH --nodes 2 # Request 2 node so all resources are in two nodes.
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. You will get 2 per node.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning --no-index
export NCCL_BLOCKING_WAIT=1 #Pytorch Lightning uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python pytorch-ddp-test-pl.py --batch_size 256
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=args.lr)
net = Net()
""" Here we initialize a Trainer() explicitly with 2 nodes and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(gpus=2, num_nodes=2,accelerator='ddp', max_epochs = args.max_epochs, progress_bar_refresh_rate=0)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
Using Horovod
Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. Its API allows you to retain the level of control over your training code that DistributedDataParallel
provides, but makes writing your scripts easier by abstracting away the need to directly configure process groups and dealing with the cluster scheduler's environment variables. It also features distributed optimizers, which may increase performance in some cases. The following is the same example as above, re-implemented using Horovod:
#!/bin/bash
#SBATCH --nodes 2 # Request 2 node so all resources are in two nodes.
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. You will get 2 per node.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision horovod --no-index
export NCCL_BLOCKING_WAIT=1 # Horovod uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
srun python pytorch_horovod.py --batch_size 256
import os
import time
import datetime
import numpy as np
import horovod.torch as hvd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--max_epochs', type=int, default=1, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
hvd.init()
print("Starting...")
local_rank = hvd.local_rank()
global_rank = hvd.rank()
torch.cuda.set_device(local_rank)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
print('From Rank: {}, ==> Preparing data..'.format(global_rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())
hvd.broadcast_parameters(net.state_dict(), root_rank=0)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(args,epoch, net, criterion, optimizer, train_loader, global_rank)
def train(args,epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
Using DeepSpeed
Model Parallelism with Multiple GPUs
In cases where a model is too large to fit inside a single GPU, you can split it into multiple parts and load each one onto a separate GPU. In the example below, we revisit the code example from previous sections to illustrate how this works: we will split a Convolutional Neural Network in two parts - the convolutional/pooling layers and the densely connected feedforward layers. This job will request 2 GPUs and each of the two parts of the model will be loaded on its own GPU. We will also add code to perform pipeline parallelism and minimize as much as possible the amount of time the second GPU sits idle waiting for the outputs of the first. To do this, we will create a separate nn.Module
for each part of our model, create a sequence of modules by wrapping our model parts with nn.Sequential
, then use torch.distributed.pipeline.sync.Pipe
to break each input batch into chunks and feed them in parallel to all parts of our model.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # request 2 GPUs
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=cc-debug
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism
export MASTER_ADDR=$(hostname)
export MASTER_PORT=34567
echo "starting training..."
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
# Convolutional + pooling part of the model
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below
part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU
part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU
net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential()
net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism
criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.to('cuda:0')
targets = targets.to('cuda:1')
# Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them
outputs = net(inputs).local_value()
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Loss: {loss.item()}")
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Combining Model and Data Parallelism
Using Torch RPC
Using DeepSpeed
Creating Model Checkpoints
Whether or not you expect your code to run for long time periods, it is a good habit to create Checkpoints during training. A checkpoint is a snapshot of your model at a given point during the training process (after a certain number of iterations or after a number of epochs) that is saved to disk and can be loaded at a later time. It is a handy way of breaking up jobs that are expected to run for a very long time, into multiple shorter jobs that may get allocated on the cluster more quickly. It is also a good way of avoiding losing progress in case of unexpected errors in your code or node failures.
With PyTorch Lightning
To create a checkpoint when training with pytorch-lightning
, we recommend using the callbacks parameter of the Trainer()
class. The following example shows how to instruct PyTorch to create a checkpoint at the end of every training epoch. Make sure the path where you want to create the checkpoint exists.
callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)] trainer = pl.Trainer(callbacks=callbacks) trainer.fit(model)
This code snippet will also load a checkpoint from ./ckpt
, if there is one, and continue training from that point. For more information, please refer to the official PyTorch Lightning documentation.
With Custom Training Loops
Please refer to the official PyTorch documentation for examples on how to create and load checkpoints inside of a training loop.
During Distributed Training
Checkpointing can also be done while running a distributed training program. With PyTorch Lightning, no extra code is required other than using the checkpoint callback as described above. If you are using DistributedDataParallel or Horovod however, checkpointing should be done only by one process (one of the ranks) of your program, since all ranks will have the same state at the end of each iteration. The following example uses the first process (rank 0) to create a checkpoint:
if global_rank == 0: torch.save(ddp_model.state_dict(), "./checkpoint_path")
You must be careful when loading a checkpoint created in this manner. If a process tries to load a checkpoint that has not yet been saved by another, you may see errors or get wrong results. To avoid this, you can add a barrier to your code to make sure the process that will create the checkpoint finishes writing it to disk before other processes attempt to load it. Also note that torch.load
will attempt to load tensors to the GPU that saved them originally (cuda:0
in this case) by default. To avoid issues, pass map_location
to torch.load
to load tensors on the correct GPU for each rank.
torch.distributed.barrier() map_location = f"cuda:{local_rank}" ddp_model.load_state_dict( torch.load("./checkpoint_path", map_location=map_location))
Troubleshooting
Memory leak
On AVX512 hardware (Béluga, Skylake or V100 nodes), older versions of Pytorch (less than v1.0.1) using older libraries (cuDNN < v7.5 or MAGMA < v2.5) may considerably leak memory resulting in an out-of-memory exception and death of your tasks. Please upgrade to the latest torch version.
LibTorch
LibTorch allows one to implement both C++ extensions to PyTorch and pure C++ machine learning applications. It contains "all headers, libraries and CMake configuration files required to depend on PyTorch", as described in the documentation.
How to use LibTorch
Get the library
wget https://download.pytorch.org/libtorch/cu100/libtorch-shared-with-deps-latest.zip unzip libtorch-shared-with-deps-latest.zip cd libtorch export LIBTORCH_ROOT=$(pwd)
Notice that the variable LIBTORCH_ROOT
is used in the example below.
A workaround is needed for compiling on Compute Canada clusters. Apply this patch:
sed -i -e 's/\/usr\/local\/cuda\/lib64\/libculibos.a;dl;\/usr\/local\/cuda\/lib64\/libculibos.a;//g' share/cmake/Caffe2/Caffe2Targets.cmake
Compile a minimal example
Create the following two files:
#include <torch/torch.h>
#include <iostream>
int main() {
torch::Device device(torch::kCPU);
if (torch::cuda::is_available()) {
std::cout << "CUDA is available! Using GPU." << std::endl;
device = torch::Device(torch::kCUDA);
}
torch::Tensor tensor = torch::rand({2, 3}).to(device);
std::cout << tensor << std::endl;
}
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(example-app)
find_package(Torch REQUIRED)
add_executable(example-app example-app.cpp)
target_link_libraries(example-app "${TORCH_LIBRARIES}")
set_property(TARGET example-app PROPERTY CXX_STANDARD 14)
Load the necessary modules:
module load cmake intel/2018.3 cuda/10 cudnn
Compile the program:
mkdir build cd build cmake -DCMAKE_PREFIX_PATH="$LIBTORCH_ROOT;$EBROOTCUDA;$EBROOTCUDNN" .. make
Run the program:
./example-app
To test an application with CUDA, request an interactive job with a GPU.