PyTorch
PyTorch est un paquet Python qui offre deux fonctionnalités de haut niveau :
- le calcul tensoriel (semblable à celui effectué par NumPy) avec forte accélération de GPU,
- des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone.
Si vous voulez porter un programme PyTorch sur un de nos superordinateurs, il serait bon de prendre connaissance de ce tutoriel.
Clarification
Il y a une certaine ressemblance entre PyTorch et Torch, mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents.
Les développeurs PyTorch offrent aussi LibTorch qui permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. Les modèles Python écrits avec PyTorch peuvent être convertis et utilisés en C++ avec TorchScript.
Installation
Wheels récemment ajoutés
Pour connaître la dernière version de PyTorch, utilisez
[name@server ~]$ avail_wheels "torch*"
Pour plus d'information, voyez Wheels disponibles.
Installation du wheel de Calcul Canada
La meilleure option est d'installer avec Python wheels comme suit :
- 1. Chargez un module Python avec module load python.
- 2. Créez et démarrez un environnement virtuel.
- 3. Installez PyTorch dans l'environnement virtuel avec
pip install
.
GPU et CPU
-
(venv) [name@server ~] pip install --no-index torch
Remarque :PyTorch 1.10 cause des problèmes connus sur nos grappes (à l'exception de Narval). Si l'entraînement distribué produit des erreurs ou si vous obtenez une erreur qui inclut c10::Error
, nous vous recommandons d'installer PyTorch 1.9.1 avec pip install --no-index torch==1.9.1
.
En supplément
En plus de torch, vous pouvez aussi installer torchvision, torchtext et torchaudio.
(venv) [name@server ~] pip install --no-index torch torchvision torchtext torchaudio
Soumettre une tâche
Le script suivant est un exemple de soumission d'une tâche utilisant le wheel Python avec un environnement virtuel dans une tâche.
#!/bin/bash
#SBATCH --gres=gpu:1 # Request GPU "generic resources"
#SBATCH --cpus-per-task=6 # Cores proportional to GPUs: 6 on Cedar, 16 on Graham.
#SBATCH --mem=32000M # Memory proportional to GPUs: 32000 Cedar, 64000 Graham.
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python/3.6
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch --no-index
python pytorch-test.py
Le script Python pytorch-ddp-test.py
a la forme suivante :
import torch
x = torch.Tensor(5, 3)
print(x)
y = torch.rand(5, 3)
print(y)
# let us run the following only if CUDA is available
if torch.cuda.is_available():
x = x.cuda()
y = y.cuda()
print(x + y)
Vous pouvez alors soumettre une tâche PyTorch avec
[name@server ~]$ sbatch pytorch-test.sh
Haute performance
TF32: Performance vs Numerical Accuracy
On version 1.7.0 PyTorch has introduced support for Nvidia's TensorFloat-32 (TF32) Mode, which in turn is available only on Ampere and later Nvidia GPU architectures. This mode of executing tensor operations has been shown to yield up to 20x speed-ups compared to equivalent single precision (FP32) operations and is enabled by default in PyTorch versions 1.7.x up to 1.11.x. However, such gains in performance come at the cost of potentially decreased accuracy in the results of operations, which may become problematic in cases such as when dealing with ill conditioned matrices, or when performing long sequences of tensor operations as is common in Deep Learning models. Following calls from its user community, TF32 is now disabled by default for matrix multiplications, but still enabled by default for convolutions starting on PyTorch version 1.12.0.
At the time of this writing, our only cluster equipped with Ampere GPUs is Narval. When using PyTorch on Narval, users should be cognizant of the following:
- You may notice a significant slow down when running the exact same GPU-enabled code with
torch < 1.12.0
andtorch >= 1.12.0
. - You may get different results when running the exact same GPU-enabled code with
torch < 1.12.0
andtorch >= 1.12.0
.
To enable or disable TF32 on torch >= 1.12.0
set the following flags to True
or False
accordingly:
torch.backends.cuda.matmul.allow_tf32 = False # Enable/disable TF32 for matrix multiplications torch.backends.cudnn.allow_tf32 = False # Enable/disable TF32 for convolutions
For more information, see PyTorch's official documentation
Travailler avec plusieurs CPU
Par défaut, PyTorch permet le parallélisme avec plusieurs CPU de deux façons :
- Intra-op, par l’implémentation parallèle d’opérateurs souvent utilisés en apprentissage profond comme le produit matriciel ou le produit de convolution, en utilisant OpenMP directement ou avec des bibliothèques de bas niveau comme MKL et OneDNN. Quand du code PyTorch doit effectuer de telles opérations, elles utilisent automatiquement de multiples fils avec tous les cœurs CPU disponibles.
- Inter-op, par la capacité d’exécuter différentes parties de code de manière concurrente. Ce mode de parallélisme nécessite habituellement que le programme soit conçu de manière à exécuter plusieurs parties en parallèle, par exemple en faisant usage du compilateur en temps réel torch.jit pour exécuter des tâches asynchrones dans un programme TorchScript.
Pour les petits modèles, nous recommandons fortement d’utiliser plusieurs CPU plutôt qu’un GPU. L’entraînement sera certainement plus rapide avec un GPU (sauf dans les cas de très petits modèles), mais si le modèle et le jeu de données ne sont pas assez grands, la vitesse gagnée avec le GPU ne sera probablement pas très importante et la tâche n’utilisera qu’une petite part de la capacité de calcul. Ce n’est peut-être pas grave sur votre propre ordinateur, mais dans un environnement partagé comme le nôtre, vous bloqueriez une ressource qui pourrait servir à effectuer de calculs de grande échelle par un autre projet. De plus, l’utilisation d’un GPU contribuerait à la diminution de l’allocation de votre groupe et aurait une incidence sur la priorité accordée aux tâches de vos collègues.
Dans le code suivant, il y a plusieurs occasions d’utiliser le parallélisme intra-op. En demandant plus de CPU et sans changer le code, on peut constater l’effet sur la performance.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-cpu.py
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Travailler avec un seul GPU
On entend souvent dire qu’il faut absolument entraîner un modèle avec un GPU s’il y en a un à notre disposition. Ceci est presque toujours vrai (l'entraînement de très petits modèles est souvent plus rapide avec un ou plusieurs CPU) sur un poste de travail local, mais ce n’est pas le cas sur nos grappes.
Autrement dit, vous ne devriez pas demander un GPU si votre code ne peut pas faire un usage raisonnable de sa capacité de calcul.
La performance avantageuse des GPU pour les tâches d’apprentissage profond provient de deux sources :
- La capacité de paralléliser l’exécution de certaines opérations clés, par exemple le multiplieur-accumulateur, sur plusieurs milliers de cœurs de calcul, en comparaison du très petit nombre de cœurs disponibles avec la plupart des CPU.
- Une bande passante de mémoire beaucoup plus grande que pour un CPU, ce qui permet aux GPU d’utiliser efficacement leur très grand nombre de cœurs pour traiter une plus grande quantité de données par cycle de calcul.
Comme c’est le cas avec plusieurs CPU, PyTorch offre des implémentations parallèles d’opérateurs souvent utilisés en apprentissage profond, comme le produit matriciel et le produit de convolution et utilise des bibliothèques spécialisées pour les GPU comme CUDNN ou MIOpen, selon la plateforme matérielle. Ceci signifie que pour qu’il vaille la peine d’utiliser un GPU pour une tâche d’apprentissage, elle doit être composée d’éléments qui peuvent être élargis à une application massive du parallélisme de par le nombre d’opérations pouvant être parallélisées, de par la quantité des données à traiter ou idéalement de par les deux. Un exemple concret serait un grand modèle qui a un grand nombre d’unités et de couches ou qui a beaucoup de données en entrée, et idéalement qui présente ces deux caractéristiques.
Dans l’exemple ci-dessous, nous adaptons le code de la section précédente pour utiliser un GPU et nous examinons la performance. Nous observons que deux paramètres jouent un rôle important : batch_size
et num_workers
. Le premier paramètre améliore la performance en augmentant la taille des entrées à chaque itération et en utilisant mieux la capacité du GPU. Dans le cas du second paramètre, la performance est améliorée en facilitant le mouvement des données entre la mémoire de l’hôte (le CPU) et la mémoire du GPU, ce qui réduit la durée d’inactivité du GPU en attente de données à traiter.
Nous pouvons tirer deux conclusions :
- Augmenter la valeur de
batch_size
au maximum qu’il est possible pour la mémoire du GPU optimise la performance. - Utiliser un
DataLoader
avec autant de workers quecpus-per-task
facilite l’apport de données au GPU.
Bien entendu, le paramètre batch_size
a aussi un impact sur la performance d’un modèle dans une tâche(c.à.d. l’exactitude, l’erreur, etc.) et il existe différentes écoles de pensée sur l’utilisation de grands lots. Nous n’abordons pas le sujet ici, mais si vous croyez qu’un petit lot conviendrait mieux à votre application, allez à la section Travailler avec un seul GPU pour savoir comment maximiser l’utilisation du GPU avec de petites entrées de données.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-gpu.py --batch_size=512 --num_workers=0
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net().cuda() # Load model on the GPU
criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Parallélisme des données avec un seul GPU
Il n’est pas conseillé d’utiliser un GPU avec un modèle de taille relativement petite qui n’utilise pas une grande part de la mémoire du GPU et une part raisonnable de sa capacité de calcul; utilisez plutôt un ou plusieurs CPU. Par contre, profiter du parallélisme du GPU devient une bonne option si vous avez un tel modèle avec un très grand jeu de données et que vous voulez effectuer l’entraînement avec des lots de petite taille.
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit un morceau des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges.
PyTorch offre des implémentations de méthodes de parallélisme des données, la classe DistributedDataParallel
étant celle recommandée par les développeurs de PyTorch pour donner la meilleure performance. Conçue pour le travail avec plusieurs GPU, elle peut aussi être employée avec un seul GPU.
Dans l’exemple ci-dessous, nous adaptons le code pour un seul GPU pour utiliser le parallélisme des données. La tâche est relativement petite; la taille du lot est de 512 images, le modèle occupe environ 1Go de la mémoire du GPU et l’entraînement n’utilise qu’environ 6 % de sa capacité de calcul. Ce modèle ne devrait pas être entraîné sur un GPU sur nos grappes. Cependant, en parallélisant les données, un GPU V100 avec 16Go de mémoire peut contenir 14 ou 15 copies du modèle et augmenter l'utilisation de la ressource en plus d’obtenir une bonne augmentation de vitesse. Nous utilisons Multi-Process Service (MPS) de NVIDIA avec MPI pour placer plusieurs copies du modèle sur un GPU de façon efficace.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# Activate Nvidia MPS:
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log
nvidia-cuda-mps-control -d
echo "starting training..."
time srun python cifar10-gpu-mps.py --batch_size=512 --num_workers=0
import os
import time
import datetime
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
def main():
print("Starting...")
args = parser.parse_args()
rank = os.environ.get("SLURM_LOCALID")
current_device = 0
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes that will run a model replica """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check.
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Travailler avec plusieurs GPU
Problème avec DistributedDataParallel et PyTorch 1.10
Avec notre wheel PyTorch 1.10 torch-1.10.0+computecanada
, le code pour travailler avec plusieurs GPU et qui utilise
DistributedDataParallel pourrait échouer de façon imprévisible si le backend est défini comme étant 'nccl'
ou 'gloo'
. Nous vous recommandons d'utiliser la version la plus récente de PyTorch plutôt que la version 1.10 sur toutes les grappes d'usage général.
Paralléliser les données avec plusieurs GPU
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit une portion des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges. Quand plusieurs GPU sont utilisés, chacun reçoit une copie du modèle; il doit donc être assez petit pour être contenu dans la mémoire d’un GPU. Pour entraîner un modèle qui dépasse la quantité de mémoire d’un GPU, voyez la section Paralléliser un modèle avec plusieurs GPU.
Il existe plusieurs manières de paralléliser les données avec PyTorch. Nous présentons ici des tutoriels avec la classe DistributedDataParallel, avec le paquet PyTorch Lightning et avec le paquet Horovod.
DistributedDataParallel
Avec plusieurs GPU, la classe DistributedDataParallel est recommandée par les développeurs PyTorch, que ce soit avec un nœud unique ou avec plusieurs nœuds. Dans le cas qui suit, plusieurs GPU sont distribués sur deux nœuds.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision --no-index
export NCCL_BLOCKING_WAIT=1 #Set this environment variable if you wish to use the NCCL backend for inter-GPU communication.
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.
echo "r$SLURM_NODEID master: $MASTER_ADDR"
echo "r$SLURM_NODEID Launching python script"
# The SLURM_NTASKS variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times
srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $SLURM_NTASKS --batch_size 256
Le script Python pytorch-ddp-test.py
a la forme suivante :
import os
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='gloo', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
print("Starting...")
args = parser.parse_args()
ngpus_per_node = torch.cuda.device_count()
""" This next line is the key to getting DistributedDataParallel working on SLURM:
SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the
current process inside a node and is also 0 or 1 in this example."""
local_rank = int(os.environ.get("SLURM_LOCALID"))
rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank
current_device = local_rank
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes running on all nodes """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
#init the process group
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank)
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device])
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(epoch, net, criterion, optimizer, train_loader, rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
PyTorch Lightning
Ce paquet fournit des interfaces à PyTorch afin de simplifier plusieurs tâches communes exigeant beaucoup de code; ceci inclut les tâches d'entraînement de modèles avec plusieurs GPU. Dans le tutoriel suivant pour PyTorch Lightning, nous reprenons le même exemple que ci-dessus, mais sans avoir explicitement recours à la classe DistributedDataParallel.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning --no-index
export NCCL_BLOCKING_WAIT=1 #Pytorch Lightning uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python pytorch-ddp-test-pl.py --batch_size 256
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=args.lr)
net = Net()
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(gpus=2, num_nodes=1,accelerator='ddp', max_epochs = args.max_epochs, progress_bar_refresh_rate=0)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
Horovod
Horovod est une plateforme d'entraînement distribué pour l'apprentissage profond, compatible avec TensorFlow, Keras, PyTorch et Apache MXNet. Son API vous permet d'avoir le même niveau de contrôle sur votre code d'entraînement qu'avec DistributedDataParallel
, mais simplifie l'écriture de vos scripts en éliminant le besoin de configurer directement les groupes de processus et en prenant en charge les variables d'environnement de l'ordonnanceur. Horovod offre aussi des optimiseurs distribués qui dans certains cas améliorent la performance. L'exemple suivant est le même que le précédent, cette fois avec Horovod.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision horovod --no-index
export NCCL_BLOCKING_WAIT=1 # Horovod uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
srun python pytorch_horovod.py --batch_size 256
import os
import time
import datetime
import numpy as np
import horovod.torch as hvd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--max_epochs', type=int, default=1, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
hvd.init()
print("Starting...")
local_rank = hvd.local_rank()
global_rank = hvd.rank()
torch.cuda.set_device(local_rank)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
print('From Rank: {}, ==> Preparing data..'.format(global_rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())
hvd.broadcast_parameters(net.state_dict(), root_rank=0)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(args,epoch, net, criterion, optimizer, train_loader, global_rank)
def train(args,epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
Paralléliser un modèle avec plusieurs GPU
Quand un modèle est trop grand pour être contenu dans un seul GPU, vous pouvez le diviser en portions et charger chacune sur des GPU distincts. Dans l’exemple suivant, nous reprenons le code des sections précédentes pour illustrer le fonctionnement. Nous divisons un réseau de neurones convolutifs en deux : d’une part les couches convolutionnelles/de regroupement et d'autre part les couches acycliques entièrement connectées. La tâche demande deux GPU, chacun d’eux étant chargé avec une des parts. De plus, nous ajoutons du code pour paralléliser les pipelines et ainsi réduire au maximum le temps pendant lequel le deuxième GPU est inactif dans l’attente des résultats du premier. Pour ce faire, nous créons un nn.Module
distinct pour chacune des portions du modèle, puis créons une séquence de modules en enveloppant les portions avec nn.Sequential
, et ensuite utilisons torch.distributed.pipeline.sync.Pipe
pour morceler chacun des lots en entrée et les passer en parallèle aux deux portions du modèle.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # request 2 GPUs
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism
export MASTER_ADDR=$(hostname)
export MASTER_PORT=34567
echo "starting training..."
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single node model parallelism test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
# Convolutional + pooling part of the model
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below
part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU
part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU
net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential()
net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism
criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.to('cuda:0')
targets = targets.to('cuda:1')
# Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them
outputs = net(inputs).local_value()
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Loss: {loss.item()}")
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Paralléliser modèle et données avec plusieurs GPU
Quand un modèle est trop grand pour être contenu dans un seul GPU et que son entraînement doit se faire avec un très grand ensemble de données, le fait de combiner le parallélisme du modèle et celui des données permet d’obtenir une bonne performance. Le principe est simple : le modèle est divisé en portions chacune attribuée à un GPU; le parallélisme des pipelines est fait avec les résultats; puis des copies du processus sont faites et les copies du modèle sont entraînées en parallèle avec des sous-ensembles distincts de l’ensemble de données d’entraînement. Comme décrit ci-dessus, les gradients sont calculés indépendamment dans chacune des copies et agrégés pour modifier toutes les copies de façon synchrone ou asynchrone, dépendant de la méthode. La différence principale ici est que chaque copie du modèle se trouve sur plus d’un GPU.
Utiliser Torch RPC et DDP
Toujours avec le même exemple, nous combinons maintenant Torch RPC et DistributedDataParallel pour séparer le modèle en deux portions et entraîner quatre copies du modèle distribuées en parallèle sur deux nœuds. Autrement dit, nous avons deux copies sur deux GPU de chaque nœud. Cependant, un avertissement s’impose : à ce jour, Torch RPC prend en charge la division d’un modèle dans un seul nœud. Pour entraîner un modèle qui dépasse la quantité de mémoire de tous les GPU dans un nœud de calcul, voyez la section DeepSpeed.
#!/bin/bash
#SBATCH --nodes 2
#SBATCH --gres=gpu:4 # Request 4 GPUs per node
#SBATCH --tasks-per-node=2 # Request one task per MODEL per node
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=16G
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
export MAIN_NODE=$(hostname)
echo "starting training..."
srun python pytorch-model-data-par.py --init_method tcp://$MAIN_NODE:3456 --world_size $SLURM_NTASKS --batch_size 512
import time
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data & model parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='mpi', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
args = parser.parse_args()
# Convolutional + pooling part of the model
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
ngpus_per_node = torch.cuda.device_count()
local_rank = int(os.environ.get("SLURM_LOCALID"))
rank = int(os.environ.get("SLURM_NODEID"))*(ngpus_per_node//2) + local_rank # Divide ngpus_per_node by the number of model parts
os.environ['MASTER_ADDR'] = '127.0.0.1' # Each model replica will run its own RPC server to run pipeline parallelism
os.environ['MASTER_PORT'] = str(34567 + local_rank) # Make sure each RPC server starts on a different port
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # Different replicas won't communicate through RPC, but through DDP
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) # Initialize Data Parallelism communications
part1 = ConvPart().cuda(local_rank) # First part of the model goes on the first GPU of each process
part2 = MLPPart().cuda(local_rank + 1) # Second part goes on the second GPU of each process
net = nn.Sequential(part1,part2)
net = Pipe(net, chunks=32, checkpoint="never")
net = torch.nn.parallel.DistributedDataParallel(net)
criterion = nn.CrossEntropyLoss().cuda(local_rank + 1) # Loss function goes on the second GPU of each process
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(epoch, net, criterion, optimizer, train_loader, rank, local_rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank, model_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda(model_rank)
targets = targets.cuda(model_rank + 1)
outputs = net(inputs).local_value()
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"From Rank {train_rank} - Loss: {loss.item()}")
batch_time = time.time() - start
if __name__=='__main__':
main()
DeepSpeed
DeepSpeed est une bibliothèque qui permet d'optimiser l’entraînement de l’apprentissage de modèles ayant des milliards de paramètres à l’échelle. Parfaitement compatible avec PyTorch, DeepSpeed offre des implémentations de nouvelles méthodes d’entraînement distribué qui font un usage efficace de la mémoire en appliquant le concept de Zero Redundancy Optimizer (ZeRO). Avec ZeRO, DeepSpeed peut distribuer le stockage et le traitement des divers éléments d’une tâche d’entraînement (états de l’optimiseur, poids du modèle, gradients et activations) sur plusieurs dispositifs comme les GPU, les CPU, les disques durs locaux et/ou les combinaisons de ces dispositifs. Cette mise en commun des ressources, surtout les ressources de stockage, permet l’entraînement efficace sur plusieurs nœuds de modèles ayant d’énormes quantités de paramètres sans avoir besoin d’écrire le code pour paralléliser le modèle, les pipelines ou les données. Les exemples qui suivent montrent comment profiter de DeepSpeed et des différentes implémentations de ZeRO en utilisant l’interface simple de PyTorch Lightning.
ZeRO avec GPU
Dans l’exemple ci-dessous, nous utilisons ZeRO stage 3 pour entraîner un modèle qui utilise un groupe de 4 GPU. Le stage 3 répartit sur les 4 GPU les trois caractéristiques, soit les états de l’optimiseur, les paramètres du modèle et les gradients du modèle. Ceci est plus efficace que le parallélisme pur des données où une copie complète du modèle est chargée sur chacun des GPU. Avec l’optimiseur DeepSpeed FusedAdam
plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>
où version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:4 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using a DeepSpeed optimizer
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export NCCL_BLOCKING_WAIT=1 #Pytorch Lightning uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3.py --batch_size 256
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import FusedAdam
from pytorch_lightning.plugins import DeepSpeedPlugin
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return FusedAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 2 nodes and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(gpus=2, num_nodes=2,strategy="deepspeed_stage_3", max_epochs = args.max_epochs,progress_bar_refresh_rate=0)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
ZeRO avec CPU
Dans cet autre exemple, nous utilisons aussi ZeRO stage 3, mais cette fois-ci nous utilisons le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que la mémoire du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU et de plus, les pas de l’optimiseur seront calculés sur le CPU. Pour des raisons pratiques, ce serait comme ajouter 32Go de mémoire additionnelle à la mémoire du GPU. Comme la mémoire du GPU est moins sollicitée, vous pouvez par exemple augmenter la taille de vos lots ou de votre modèle. Avec l’optimiseur DeepSpeed DeepSpeedCPUAdam
plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>
où version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:4 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export NCCL_BLOCKING_WAIT=1 #Pytorch Lightning uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-cpu.py --batch_size 256
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.plugins import DeepSpeedPlugin
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return DeepSpeedCPUAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 2 nodes and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(gpus=2, num_nodes=2,strategy=DeepSpeedPlugin(
stage=3,
offload_optimizer=True,
offload_parameters=True,
), max_epochs = args.max_epochs,progress_bar_refresh_rate=0)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
ZeRO avec utilisation de disques NVMe
ZeRO stage 3 nous sert encore, cette fois-ci en utilisant le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que l’espace de stockage local du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU. Ici encore, les pas de l’optimiseur seront calculés sur le CPU. De même, pour des raisons pratiques, ce serait comme ajouter à la mémoire du GPU autant d’espace de stockage que sur le disque local, mais par contre nous aurons une forte perte de performance. Cette approche peut être utilisée avec tous les types de stockage, mais elle est à privilégier avec les disques NVMe qui sont plus rapides et ont des temps de réponse plus courts, ce qui compense pour la baisse de performance.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:4 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export NCCL_BLOCKING_WAIT=1 #Pytorch Lightning uses the NCCL backend for inter-GPU communication by default. Set this variable to avoid timeout errors.
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-nvme.py --batch_size 256
import datetime
import os
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam,FusedAdam
from pytorch_lightning.plugins import DeepSpeedPlugin
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to disk test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return DeepSpeedCPUAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 2 nodes and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
local_scratch = os.environ['SLURM_TMPDIR'] # Get path where local storage is mounted
print(f'Offloading to: {local_scratch}')
trainer = pl.Trainer(gpus=2, num_nodes=2,strategy=DeepSpeedPlugin(
stage=3,
offload_optimizer=True,
offload_parameters=True,
remote_device="nvme",
offload_params_device="nvme",
offload_optimizer_device="nvme",
nvme_path=local_scratch,
),checkpoint_callback=False, max_epochs = args.max_epochs) # Disable PyTorch Lightning checkpointing when offloading to disk
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
Créer des points de contrôle
Peu importe si vous pensez que la durée d'exécution de votre code sera longue ou non, il est bon de prendre l'habitude de créer des points de contrôle pendant l'entraînement. Un point de contrôle est un portrait de votre modèle à un moment précis du processus d'entraînement (après un certain nombre d'itérations ou un certain nombre d'époques) que vous pouvez sauvegarder sur disque et utiliser plus tard. C'est un moyen pratique de diviser les tâches qui devraient être de longue durée en de multiples petites tâches auxquelles l'ordonnanceur peut allouer des ressources plus rapidement. C'est aussi une bonne façon de ne pas perdre le progrès réalisé au cas où des erreurs de code inattendues surviendraient ou que les nœuds ne soient pas disponibles pour quelconque raison.
Avec PyTorch Lightning
Nous recommandons d'utiliser le paramètre de rappels (callbacks parameter) de la classe Trainer()
. Dans l'exemple suivant, on demande à PyTorch de créer un point de contrôle à la fin de chacune des époques d'entraînement. Vérifiez que le chemin où créer le point de contrôle existe.
callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)] trainer = pl.Trainer(callbacks=callbacks) trainer.fit(model)
Ce bout de code chargera un point de contrôle de ./ckpt
(s'il en existe) et poursuivra l'entraînement à partir de ce point. Pour plus d'information, consultez la documentation PyTorch Lightning.
Avec des boucles d'entraînement personnalisées
Pour des exemples, consultez la documentation PyTorch.
Pendant l’entraînement distribué
Les points de contrôle peuvent être utilisés pendant l’exécution d’un programme d’entraînement distribué. Avec PyTorch Lightning, aucun code supplémentaire n’est requis, autre que d’insérer le paramètre de rappels (callbacks parameter) mentionné ci-dessus. Cependant, si vous utilisez DistributedDataParallel ou Horovod, les points de contrôle devront être créés par un seul processus (rank) de votre programme puisque tous les processus auront le même état après chaque itération. Dans cet exemple, le premier processus (rank 0) crée un point de contrôle.
if global_rank == 0: torch.save(ddp_model.state_dict(), "./checkpoint_path")
Faites attention aux points de contrôle ainsi créés. Si un processus tente de charger un point de contrôle qui n’a pas encore été sauvegardé par un autre, des erreurs peuvent survenir ou de mauvais résultats peuvent être produits. Pour éviter ceci, vous pouvez ajouter une barrière à votre code pour faire en sorte que le processus qui crée le point de contrôle a terminé son écriture sur le disque avant que d’autres processus tentent de le charger. Remarquez aussi que torch.load
essaiera par défaut de charger les tenseurs sur le GPU sur lequel ils étaient initialement sauvegardés, dans notre cas cuda:0
. Pour éviter les problèmes, passez map_location
à torch.load
pour charger les tenseurs sur le GPU identifié par chaque processus.
torch.distributed.barrier() map_location = f"cuda:{local_rank}" ddp_model.load_state_dict( torch.load("./checkpoint_path", map_location=map_location))
Dépannage
Fuites de mémoire
Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de torch.
c10::Error
Dans certains cas, vous pouvez obtenir un erreur comme
terminate called after throwing an instance of 'c10::Error' what(): Given groups=1, weight of size [256, 1, 3, 3], expected input[16, 10, 16, 16] to have 1 channels, but got 10 channels instead Exception raised from check_shape_forward at /tmp/coulombc/pytorch_build_2021-11-09_14-57-01/avx2/python3.8/pytorch/aten/src/ATen/native/Convolution.cpp:496 (most recent call first): ...
Une exception C++ est émise plutôt qu'une exception Python. Ceci peut se produire quand vous programmez en C++ avec libtorch, mais ne devrait pas se produire quand vous programmez en Python. Il n'est pas possible de suivre la trace des appels (traceback)du programme Python, ce qui ne permet pas d'identifier facilement la cause de l'erreur dans le script Python. Nous avons constaté que le fait d'utiliser PyTorch 1.9.1 plutôt que 1.10.x permet le traceback du programme Python.
LibTorch
LibTorch permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. La distribution LibTorch possède les en-têtes, bibliothèques et fichiers de configuration CMake nécessaires pour travailler avec PyTorch, tel que décrit dans la documentation.
Utiliser LibTorch
Obtenir la bibliothèque
wget https://download.pytorch.org/libtorch/cu100/libtorch-shared-with-deps-latest.zip unzip libtorch-shared-with-deps-latest.zip cd libtorch export LIBTORCH_ROOT=$(pwd)
Remarquez que l'exemple suivant utilise la variable LIBTORCH_ROOT
.
Pour compiler sur les grappes de Calcul Canada, appliquez le correctif
sed -i -e 's/\/usr\/local\/cuda\/lib64\/libculibos.a;dl;\/usr\/local\/cuda\/lib64\/libculibos.a;//g' share/cmake/Caffe2/Caffe2Targets.cmake
Compiler un exemple simple
Créez les deux fichiers suivants :
#include <torch/torch.h>
#include <iostream>
int main() {
torch::Device device(torch::kCPU);
if (torch::cuda::is_available()) {
std::cout << "CUDA is available! Using GPU." << std::endl;
device = torch::Device(torch::kCUDA);
}
torch::Tensor tensor = torch::rand({2, 3}).to(device);
std::cout << tensor << std::endl;
}
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(example-app)
find_package(Torch REQUIRED)
add_executable(example-app example-app.cpp)
target_link_libraries(example-app "${TORCH_LIBRARIES}")
set_property(TARGET example-app PROPERTY CXX_STANDARD 14)
Chargez les modules avec
module load cmake intel/2018.3 cuda/10 cudnn
Compilez le programme avec
mkdir build cd build cmake -DCMAKE_PREFIX_PATH="$LIBTORCH_ROOT;$EBROOTCUDA;$EBROOTCUDNN" .. make
Lancez le programme avec
./example-app
Pour tester une application avec CUDA, demandez une tâche interactive avec GPU.