PyTorch/fr: Difference between revisions
(Created page with "Compilez le programme.") |
No edit summary Tags: Mobile edit Mobile web edit |
||
(274 intermediate revisions by 4 users not shown) | |||
Line 1: | Line 1: | ||
<languages /> | <languages /> | ||
[[Category:Software]] | [[Category:Software]][[Category:AI and Machine Learning]] | ||
[http://pytorch.org/ PyTorch] est un paquet Python qui offre deux fonctionnalités de haut niveau : | [http://pytorch.org/ PyTorch] est un paquet Python qui offre deux fonctionnalités de haut niveau : | ||
*le calcul tensoriel (semblable à celui effectué par NumPy) avec | *le calcul tensoriel (semblable à celui effectué par NumPy) avec forte accélération de GPU, | ||
*des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone. | *des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone. | ||
Si vous voulez porter un programme PyTorch sur une de nos grappes, il serait bon de prendre connaissance [[Tutoriel Apprentissage machine| de ce tutoriel]]. | |||
= Clarification = | |||
Il y a une certaine ressemblance entre PyTorch et [[Torch/fr|Torch]], mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents. | Il y a une certaine ressemblance entre PyTorch et [[Torch/fr|Torch]], mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents. | ||
PyTorch | Les développeurs PyTorch offrent aussi [[PyTorch/fr#LibTorch|LibTorch]] qui permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. Les modèles Python écrits avec PyTorch peuvent être convertis et utilisés en C++ avec [https://pytorch.org/tutorials/advanced/cpp_export.html TorchScript]. | ||
= Installation = | = Installation = | ||
== | ==Wheels récemment ajoutés== | ||
Pour connaître la dernière version de PyTorch, utilisez | Pour connaître la dernière version de PyTorch, utilisez | ||
{{Command|avail_wheels "torch*"}} | {{Command|avail_wheels "torch*"}} | ||
Pour plus d'information, voyez [[Python/fr#Wheels_disponibles|Wheels disponibles]]. | |||
==Installation | ==Installation du wheel == | ||
La meilleure option est d'installer avec [https://pythonwheels.com/ Python wheels] comme suit : | La meilleure option est d'installer avec [https://pythonwheels.com/ Python wheels] comme suit : | ||
::1. [[Utiliser_des_modules#Sous-commande_load|Chargez un module]] Python | ::1. [[Utiliser_des_modules#Sous-commande_load|Chargez un module]] Python avec <code>module load python</code>. | ||
::2. Créez et démarrez un [[Python/fr#Créer_et_utiliser_un_environnement_virtuel|environnement virtuel]]. | ::2. Créez et démarrez un [[Python/fr#Créer_et_utiliser_un_environnement_virtuel|environnement virtuel]]. | ||
::3. Installez PyTorch dans l'environnement virtuel avec <code>pip install</code>. | ::3. Installez PyTorch dans l'environnement virtuel avec <code>pip install</code>. | ||
==== GPU et CPU ==== | ==== GPU et CPU ==== | ||
:{{Command|prompt=(venv) [name@server ~]|pip install | :{{Command|prompt=(venv) [name@server ~]|pip install --no-index torch }} | ||
<b>Remarque : </b>PyTorch 1.10 cause des problèmes connus sur nos grappes (à l'exception de Narval). Si l'entraînement distribué produit des erreurs ou si vous obtenez une erreur qui inclut <code>c10::Error</code>, nous vous recommandons d'installer PyTorch 1.9.1 avec <code>pip install --no-index torch==1.9.1</code>. | |||
====En supplément==== | ====En supplément==== | ||
En plus de < | En plus de <code>torch</code>, vous pouvez aussi installer <code>torchvision</code>, <code>torchtext</code> et <code>torchaudio</code>. | ||
{{Command|prompt=(venv) [name@server ~]|pip install | {{Command|prompt=(venv) [name@server ~]|pip install --no-index torch torchvision torchtext torchaudio }} | ||
=Soumettre une tâche= | =Soumettre une tâche= | ||
Le script suivant est un exemple de soumission d'une tâche utilisant le | Le script suivant est un exemple de soumission d'une tâche utilisant le wheel Python avec un environnement virtuel. | ||
{{File | {{File | ||
|name=pytorch-test.sh | |name=pytorch-test.sh | ||
Line 45: | Line 52: | ||
#SBATCH --output=%N-%j.out | #SBATCH --output=%N-%j.out | ||
module load python/ | module load python/<select version> # Make sure to choose a version that suits your application | ||
virtualenv --no-download $SLURM_TMPDIR/env | virtualenv --no-download $SLURM_TMPDIR/env | ||
source $SLURM_TMPDIR/env/bin/activate | source $SLURM_TMPDIR/env/bin/activate | ||
Line 53: | Line 60: | ||
}} | }} | ||
Le script Python <code>pytorch-test.py</code> | Le script Python <code>pytorch-ddp-test.py</code> a la forme suivante : | ||
{{File | {{File | ||
|name=pytorch-test.py | |name=pytorch-test.py | ||
Line 72: | Line 80: | ||
Vous pouvez alors soumettre une tâche PyTorch avec | Vous pouvez alors soumettre une tâche PyTorch avec | ||
{{Command|sbatch pytorch-test.sh}} | {{Command|sbatch pytorch-test.sh}} | ||
= Haute performance= | |||
== TF32 : Performance vs précision == | |||
Avec sa version 1.7.0, PyTorch a ajouté le support pour le [https://blogs.nvidia.com/blog/2020/05/14/tensorfloat-32-precision-format/ mode TensorFloat-32 (TF32) de Nvidia] et est seulement disponible pour les architectures GPU d'Ampere et de Nvidia. Avec ce mode qui est offert par défaut dans les versions 1.7.x à 1.11.x, les opérations tensorielles se font jusqu'à 20x plus rapidement que les opérations équivalentes en simple précision (FP32). Cependant, ce gain en performance peut engendrer une baisse dans la précision du résultat des opérations, ce qui pose problème avec les modèles d'apprentissage profond qui utilisent à l'occasion des matrices mal conditionnées ou qui effectuent de longues séquences d'opérations tensorielles. Suite aux commentaires de la communauté des utilisateurs, TF32 est <b>désactivé par défaut pour les multiplications matricielle et activé par défaut pour les convolutions</b> à partir de la version 1.12.0. | |||
En date d'octobre 2022, notre seule grappe qui offre des GPU Ampere est [[Narval]]. Quand vous utilisez PyTorch sur Narval, | |||
* Vous pourriez remarquer un fort ralentissement dans l'exécution sur GPU du même code avec <code>torch < 1.12.0</code> et <code>torch >= 1.12.0</code>. | |||
* Vous pourriez obtenir des résultats différents dans l'exécution sur GPU du même code avec <code>torch < 1.12.0</code> et <code>torch >= 1.12.0</code>. | |||
Pour activer ou désactiver TF32 pour <code>torch >= 1.12.0</code>, donnez la valeur <code>True</code> ou <code>False</code> aux indicateurs suivants : | |||
torch.backends.cuda.matmul.allow_tf32 = False # Enable/disable TF32 for matrix multiplications | |||
torch.backends.cudnn.allow_tf32 = False # Enable/disable TF32 for convolutions | |||
Pour plus d'information, consultez [https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere ce paragraphe de la documentation PyTorch]. | |||
== Travailler avec plusieurs CPU == | |||
Par défaut, PyTorch permet le parallélisme avec plusieurs CPU de deux façons : | |||
* <b>intra-op</b>, par l’implémentation parallèle d’opérateurs souvent utilisés en apprentissage profond comme le produit matriciel ou le produit de convolution, en utilisant [https://www.openmp.org OpenMP] directement ou avec des bibliothèques de bas niveau comme [https://en.wikipedia.org/wiki/Math_Kernel_Library MKL] et [https://www.intel.com/content/www/us/en/develop/documentation/oneapi-programming-guide/top/api-based-programming/intel-oneapi-deep-neural-network-library-onednn.html OneDNN]. Quand du code PyTorch doit effectuer de telles opérations, elles utilisent automatiquement de multiples fils avec tous les cœurs CPU disponibles. | |||
* <b>inter-op</b>, par la capacité d’exécuter différentes parties de code de manière concurrente. Ce mode de parallélisme nécessite habituellement que le programme soit conçu de manière à exécuter plusieurs parties en parallèle, par exemple en faisant usage du compilateur en temps réel <tt>torch.jit</tt> pour exécuter des tâches asynchrones dans un programme [https://pytorch.org/docs/stable/jit.html#built-in-functions-and-modules TorchScript]. | |||
Pour les petits modèles, nous recommandons fortement <b>d’utiliser plusieurs CPU plutôt qu’un GPU</b>. L’entraînement sera certainement plus rapide avec un GPU (sauf dans les cas de très petits modèles), mais si le modèle et le jeu de données ne sont pas assez grands, la vitesse gagnée avec le GPU ne sera probablement pas très importante et la tâche n’utilisera qu’une petite part de la capacité de calcul. Ce n’est peut-être pas grave sur votre propre ordinateur, mais dans un environnement partagé comme sur nos grappes, vous bloqueriez une ressource qui pourrait servir à effectuer de calculs de grande échelle par un autre projet. De plus, l’utilisation d’un GPU contribuerait à la diminution de l’allocation de votre groupe et aurait une incidence sur la priorité accordée aux tâches de vos collègues. | |||
Dans le code suivant, il y a plusieurs occasions d’utiliser le parallélisme intra-op. En demandant plus de CPU et sans changer le code, on peut constater l’effet sur la performance. | |||
{{File | |||
|name=pytorch-multi-cpu.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --tasks-per-node=1 | |||
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0:05:00 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision --no-index | |||
echo "starting training..." | |||
time python cifar10-cpu.py | |||
}} | |||
{{File | |||
|name=cifar10-cpu.py | |||
|lang="python" | |||
|contents= | |||
import numpy as np | |||
import time | |||
import torch | |||
import torch.nn as nn | |||
import torch.nn.functional as F | |||
import torch.optim as optim | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import argparse | |||
import os | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=512, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
args = parser.parse_args() | |||
torch.set_num_threads(int(os.environ['SLURM_CPUS_PER_TASK'])) | |||
class Net(nn.Module): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
net = Net() | |||
criterion = nn.CrossEntropyLoss() | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
### This next line will attempt to download the CIFAR10 dataset from the internet if you don't already have it stored in ./data | |||
### Run this line on a login node with "download=True" prior to submitting your job, or manually download the data from | |||
### https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and place it under ./data | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
perf = [] | |||
total_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
outputs = net(inputs) | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
batch_time = time.time() - start | |||
images_per_sec = args.batch_size/batch_time | |||
perf.append(images_per_sec) | |||
total_time = time.time() - total_start | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
== Travailler avec un seul GPU == | |||
On entend souvent dire qu’il faut absolument entraîner un modèle avec un GPU s’il y en a un à notre disposition. Ceci est ''presque toujours'' vrai (l'entraînement de très petits modèles est souvent plus rapide avec un ou plusieurs CPU) sur un poste de travail local, mais ce n’est pas le cas sur nos grappes. | |||
Autrement dit, vous '''ne devriez pas demander un GPU''' si votre code ne peut pas faire un usage raisonnable de sa capacité de calcul. | |||
La performance avantageuse des GPU pour les tâches d’apprentissage profond provient de deux sources : | |||
# La capacité de paralléliser l’exécution de certaines opérations clés, par exemple le [https://fr.wikipedia.org/wiki/Multiplieur-accumulateur multiplieur-accumulateur], sur plusieurs milliers de cœurs de calcul, en comparaison du très petit nombre de cœurs disponibles avec la plupart des CPU. | |||
# Une bande passante de mémoire beaucoup plus grande que pour un CPU, ce qui permet aux GPU d’utiliser efficacement leur très grand nombre de cœurs pour traiter une plus grande quantité de données par cycle de calcul. | |||
Comme c’est le cas avec plusieurs CPU, PyTorch offre des implémentations parallèles d’opérateurs souvent utilisés en apprentissage profond, comme le produit matriciel et le produit de convolution et utilise des bibliothèques spécialisées pour les GPU comme [https://developer.nvidia.com/cudnn CUDNN] ou [https://github.com/ROCmSoftwarePlatform/MIOpen MIOpen], selon la plateforme matérielle. Ceci signifie que pour qu’il vaille la peine d’utiliser un GPU pour une tâche d’apprentissage, elle doit être composée d’éléments qui peuvent être élargis à une application massive du parallélisme de par le nombre d’opérations pouvant être parallélisées, de par la quantité des données à traiter ou idéalement de par les deux. Un exemple concret serait un grand modèle qui a un grand nombre d’unités et de couches ou qui a beaucoup de données en entrée, et idéalement qui présente ces deux caractéristiques. | |||
Dans l’exemple ci-dessous, nous adaptons le code de la section précédente pour utiliser un GPU et nous examinons la performance. Nous observons que deux paramètres jouent un rôle important : <code>batch_size</code> et <code>num_workers</code>. Le premier paramètre améliore la performance en augmentant la taille des entrées à chaque itération et en utilisant mieux la capacité du GPU. Dans le cas du second paramètre, la performance est améliorée en facilitant le mouvement des données entre la mémoire de l’hôte (le CPU) et la mémoire du GPU, ce qui réduit la durée d’inactivité du GPU en attente de données à traiter. | |||
Nous pouvons tirer deux conclusions : | |||
# Augmenter la valeur de <code>batch_size</code> au maximum qu’il est possible pour la mémoire du GPU optimise la performance. | |||
# Utiliser un <code>DataLoader</code> avec autant de workers que <code>cpus-per-task</code> facilite l’apport de données au GPU. | |||
Bien entendu, le paramètre <code>batch_size</code> a aussi un impact sur la performance d’un modèle dans une tâche(c.à.d. l’exactitude, l’erreur, etc.) et il existe différentes écoles de pensée sur l’utilisation de grands lots. Nous n’abordons pas le sujet ici, mais si vous croyez qu’un petit lot conviendrait mieux à votre application, allez à la section [[PyTorch/fr#Travailler_avec_un_seul_GPU|Travailler avec un seul GPU]] pour savoir comment maximiser l’utilisation du GPU avec de petites entrées de données. | |||
{{File | |||
|name=pytorch-single-gpu.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:1 # request a GPU | |||
#SBATCH --tasks-per-node=1 | |||
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0:05:00 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision --no-index | |||
echo "starting training..." | |||
time python cifar10-gpu.py --batch_size=512 --num_workers=0 | |||
}} | |||
{{File | |||
|name=cifar10-gpu.py | |||
|lang="python" | |||
|contents= | |||
import numpy as np | |||
import time | |||
import torch | |||
import torch.nn as nn | |||
import torch.nn.functional as F | |||
import torch.optim as optim | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=512, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
args = parser.parse_args() | |||
class Net(nn.Module): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
net = Net().cuda() # Load model on the GPU | |||
criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
perf = [] | |||
total_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.cuda() | |||
targets = targets.cuda() | |||
outputs = net(inputs) | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
batch_time = time.time() - start | |||
images_per_sec = args.batch_size/batch_time | |||
perf.append(images_per_sec) | |||
total_time = time.time() - total_start | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
=== Parallélisme des données avec un seul GPU === | |||
Il <b>n’est pas conseillé d’utiliser un GPU</b> avec un modèle de taille relativement petite qui n’utilise pas une grande part de la mémoire du GPU et une part raisonnable de sa capacité de calcul; utilisez plutôt [[PyTorch/fr#Travailler_avec_plusieurs_CPU|un ou plusieurs CPU]]. Par contre, profiter du parallélisme du GPU devient une bonne option si vous avez un tel modèle avec un très grand jeu de données et que vous voulez effectuer l’entraînement avec des lots de petite taille. | |||
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit un morceau des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, <b>un avertissement s’impose</b> : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez [https://discuss.pytorch.org/t/should-we-split-batch-size-according-to-ngpu-per-node-when-distributeddataparallel/72769/13 ces échanges]. | |||
PyTorch offre des implémentations de méthodes de parallélisme des données, la classe <code>DistributedDataParallel</code> étant celle [https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#comparison-between-dataparallel-and-distributeddataparallel recommandée par les développeurs de PyTorch] pour donner la meilleure performance. Conçue pour le [[PyTorch/fr#Travailler_avec_plusieurs_GPU|travail avec plusieurs GPU]], elle peut aussi être employée avec un seul GPU. | |||
Dans l’exemple ci-dessous, nous adaptons le code pour un seul GPU pour utiliser le parallélisme des données. La tâche est relativement petite; la taille du lot est de 512 images, le modèle occupe environ 1Go de la mémoire du GPU et l’entraînement n’utilise qu’environ 6 % de sa capacité de calcul. Ce modèle '''ne devrait pas''' être entraîné sur un GPU sur nos grappes. Cependant, en parallélisant les données, un GPU V100 avec 16Go de mémoire peut contenir 14 ou 15 copies du modèle et augmenter l'utilisation de la ressource en plus d’obtenir une bonne augmentation de vitesse. Nous utilisons [https://docs.nvidia.com/deploy/mps/index.html Multi-Process Service (MPS) de NVIDIA] avec [[MPI/fr|MPI]] pour placer plusieurs copies du modèle sur un GPU de façon efficace. | |||
{{File | |||
|name=pytorch-gpu-mps.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:1 # request a GPU | |||
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance | |||
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0:05:00 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision --no-index | |||
# Activate Nvidia MPS: | |||
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps | |||
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log | |||
nvidia-cuda-mps-control -d | |||
echo "starting training..." | |||
time srun --cpus-per-task=$SLURM_CPUS_PER_TASK python cifar10-gpu-mps.py --batch_size=512 --num_workers=0 | |||
}} | |||
{{File | |||
|name=cifar10-gpu-mps.py | |||
|lang="python" | |||
|contents= | |||
import os | |||
import time | |||
import datetime | |||
import numpy as np | |||
import torch | |||
import torch.nn as nn | |||
import torch.nn.functional as F | |||
import torch.optim as optim | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import torch.distributed as dist | |||
import torch.utils.data.distributed | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=512, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
rank = os.environ.get("SLURM_LOCALID") | |||
current_device = 0 | |||
torch.cuda.set_device(current_device) | |||
""" this block initializes a process group and initiate communications | |||
between all processes that will run a model replica """ | |||
print('From Rank: {}, ==> Initializing Process Group...'.format(rank)) | |||
dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check. | |||
print("process group ready!") | |||
print('From Rank: {}, ==> Making model..'.format(rank)) | |||
class Net(nn.Module): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
net = Net() | |||
net.cuda() | |||
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel | |||
criterion = nn.CrossEntropyLoss().cuda() | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr) | |||
print('From Rank: {}, ==> Preparing data..'.format(rank)) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train) | |||
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler) | |||
perf = [] | |||
total_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.cuda() | |||
targets = targets.cuda() | |||
outputs = net(inputs) | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
batch_time = time.time() - start | |||
images_per_sec = args.batch_size/batch_time | |||
perf.append(images_per_sec) | |||
total_time = time.time() - total_start | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
== Travailler avec plusieurs GPU == | |||
=== Problème avec DistributedDataParallel et PyTorch 1.10 === | |||
Avec notre wheel PyTorch 1.10 <code>torch-1.10.0+computecanada</code>, le code pour travailler avec plusieurs GPU et qui utilise | |||
[[PyTorch/fr#DistributedDataParallel|DistributedDataParallel]] pourrait échouer de façon imprévisible si le backend est défini comme étant <code>'nccl'</code> ou <code>'gloo'</code>. Nous vous recommandons d'utiliser la version la plus récente de PyTorch plutôt que la version 1.10 sur toutes les grappes d'usage général. | |||
===Paralléliser les données avec plusieurs GPU=== | |||
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit une portion des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez [https://discuss.pytorch.org/t/should-we-split-batch-size-according-to-ngpu-per-node-when-distributeddataparallel/72769/13 ces échanges]. | |||
Quand plusieurs GPU sont utilisés, chacun reçoit une copie du modèle; il doit donc être assez petit pour être contenu dans la mémoire d’un GPU. Pour entraîner un modèle qui dépasse la quantité de mémoire d’un GPU, voyez la section [[PyTorch/fr#Paralléliser_un_modèle_avec_plusieurs_GPU|Paralléliser un modèle avec plusieurs GPU]]. | |||
Il existe plusieurs manières de paralléliser les données avec PyTorch. Nous présentons ici des tutoriels avec la classe '''DistributedDataParallel''', avec le paquet '''PyTorch Lightning''' et avec le paquet '''Horovod'''. | |||
====DistributedDataParallel==== | |||
Avec plusieurs GPU, la classe '''DistributedDataParallel''' est [https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#comparison-between-dataparallel-and-distributeddataparallel recommandée par les développeurs PyTorch], que ce soit avec un nœud unique ou avec plusieurs nœuds. Dans le cas qui suit, plusieurs GPU sont distribués sur deux nœuds. | |||
{{File | |||
|name=pytorch-ddp-test.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0-03:00 | |||
#SBATCH --output=%N-%j.out | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
srun -N $SLURM_NNODES -n $SLURM_NNODES bash << EOF | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torchvision --no-index | |||
EOF | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable. | |||
echo "r$SLURM_NODEID master: $MASTER_ADDR" | |||
echo "r$SLURM_NODEID Launching python script" | |||
# The $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times | |||
source $SLURM_TMPDIR/env/bin/activate | |||
srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) --batch_size 256 | |||
}} | |||
Le script Python <code>pytorch-ddp-test.py</code> a la forme suivante : | |||
{{File | |||
|name=pytorch-ddp-test.py | |||
|lang="python" | |||
|contents= | |||
import os | |||
import time | |||
import datetime | |||
import torch | |||
import torch.nn as nn | |||
import torch.nn.functional as F | |||
import torch.optim as optim | |||
import torch.backends.cudnn as cudnn | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import torch.distributed as dist | |||
import torch.utils.data.distributed | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--max_epochs', type=int, default=4, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='') | |||
parser.add_argument('--dist-backend', default='gloo', type=str, help='') | |||
parser.add_argument('--world_size', default=1, type=int, help='') | |||
parser.add_argument('--distributed', action='store_true', help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
ngpus_per_node = torch.cuda.device_count() | |||
""" This next line is the key to getting DistributedDataParallel working on SLURM: | |||
SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the | |||
current process inside a node and is also 0 or 1 in this example.""" | |||
local_rank = int(os.environ.get("SLURM_LOCALID")) | |||
rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank | |||
current_device = local_rank | |||
torch.cuda.set_device(current_device) | |||
""" this block initializes a process group and initiate communications | |||
between all processes running on all nodes """ | |||
print('From Rank: {}, ==> Initializing Process Group...'.format(rank)) | |||
#init the process group | |||
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) | |||
print("process group ready!") | |||
print('From Rank: {}, ==> Making model..'.format(rank)) | |||
class Net(nn.Module): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
net = Net() | |||
net.cuda() | |||
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) | |||
print('From Rank: {}, ==> Preparing data..'.format(rank)) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler) | |||
criterion = nn.CrossEntropyLoss().cuda() | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4) | |||
for epoch in range(args.max_epochs): | |||
train_sampler.set_epoch(epoch) | |||
train(epoch, net, criterion, optimizer, train_loader, rank) | |||
def train(epoch, net, criterion, optimizer, train_loader, train_rank): | |||
train_loss = 0 | |||
correct = 0 | |||
total = 0 | |||
epoch_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.cuda() | |||
targets = targets.cuda() | |||
outputs = net(inputs) | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
train_loss += loss.item() | |||
_, predicted = outputs.max(1) | |||
total += targets.size(0) | |||
correct += predicted.eq(targets).sum().item() | |||
acc = 100 * correct / total | |||
batch_time = time.time() - start | |||
elapse_time = time.time() - epoch_start | |||
elapse_time = datetime.timedelta(seconds=elapse_time) | |||
print("From Rank: {}, Training time {}".format(train_rank, elapse_time)) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
====PyTorch Lightning==== | |||
Ce paquet fournit des interfaces à PyTorch afin de simplifier plusieurs tâches communes exigeant beaucoup de code; ceci inclut les tâches d'entraînement de modèles avec plusieurs GPU. Dans le tutoriel suivant pour PyTorch Lightning, nous reprenons le même exemple que ci-dessus, mais sans avoir explicitement recours à la classe DistributedDataParallel. | |||
{{File | |||
|name=pytorch-ddp-test-pl.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0-03:00 | |||
#SBATCH --output=%N-%j.out | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torchvision pytorch-lightning --no-index | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job | |||
# If it is, it expects the user to have requested one task per GPU. | |||
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail! | |||
srun python pytorch-ddp-test-pl.py --batch_size 256 | |||
}} | |||
{{File | |||
|name=pytorch-ddp-test-pl.py | |||
|lang="python" | |||
|contents= | |||
import datetime | |||
import torch | |||
from torch import nn | |||
import torch.nn.functional as F | |||
import pytorch_lightning as pl | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--max_epochs', type=int, default=4, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
class Net(pl.LightningModule): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
def training_step(self, batch, batch_idx): | |||
x, y = batch | |||
y_hat = self(x) | |||
loss = F.cross_entropy(y_hat, y) | |||
return loss | |||
def configure_optimizers(self): | |||
return torch.optim.Adam(self.parameters(), lr=args.lr) | |||
net = Net() | |||
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPUs per node. | |||
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs | |||
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. | |||
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, | |||
which can cause issues due to updating logs too frequently.""" | |||
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy='ddp', max_epochs = args.max_epochs, enable_progress_bar=False) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
trainer.fit(net,train_loader) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
====Horovod==== | |||
[https://horovod.readthedocs.io/en/latest/summary_include.html Horovod] est une plateforme d'entraînement distribué pour l'apprentissage profond, compatible avec TensorFlow, Keras, PyTorch et Apache MXNet. Son API vous permet d'avoir le même niveau de contrôle sur votre code d'entraînement qu'avec <code>DistributedDataParallel</code>, mais simplifie l'écriture de vos scripts en éliminant le besoin de configurer directement les groupes de processus et en prenant en charge les variables d'environnement de l'ordonnanceur. Horovod offre aussi des optimiseurs distribués qui dans certains cas améliorent la performance. L'exemple suivant est le même que le précédent, cette fois avec Horovod. | |||
{{File | |||
|name=pytorch_horovod.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0-03:00 | |||
#SBATCH --output=%N-%j.out | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision horovod --no-index | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
srun python pytorch_horovod.py --batch_size 256 | |||
}} | |||
{{File | |||
|name=pytorch_horovod.py | |||
|lang="python" | |||
|contents= | |||
import os | |||
import time | |||
import datetime | |||
import numpy as np | |||
import horovod.torch as hvd | |||
import torch | |||
import torch.nn as nn | |||
import torch.nn.functional as F | |||
import torch.optim as optim | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import torch.distributed as dist | |||
import torch.utils.data.distributed | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=512, help='') | |||
parser.add_argument('--max_epochs', type=int, default=1, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
args = parser.parse_args() | |||
hvd.init() | |||
print("Starting...") | |||
local_rank = hvd.local_rank() | |||
global_rank = hvd.rank() | |||
torch.cuda.set_device(local_rank) | |||
class Net(nn.Module): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
def forward(self, x): | |||
x = self.pool(F.relu(self.conv1(x))) | |||
x = self.pool(F.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
x = F.relu(self.fc1(x)) | |||
x = F.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
net = Net() | |||
net.cuda() | |||
print('From Rank: {}, ==> Preparing data..'.format(global_rank)) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler) | |||
criterion = nn.CrossEntropyLoss().cuda() | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4) | |||
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters()) | |||
hvd.broadcast_parameters(net.state_dict(), root_rank=0) | |||
for epoch in range(args.max_epochs): | |||
train_sampler.set_epoch(epoch) | |||
train(args,epoch, net, criterion, optimizer, train_loader, global_rank) | |||
def train(args,epoch, net, criterion, optimizer, train_loader, train_rank): | |||
train_loss = 0 | |||
correct = 0 | |||
total = 0 | |||
epoch_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.cuda() | |||
targets = targets.cuda() | |||
outputs = net(inputs) | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
train_loss += loss.item() | |||
_, predicted = outputs.max(1) | |||
total += targets.size(0) | |||
correct += predicted.eq(targets).sum().item() | |||
acc = 100 * correct / total | |||
batch_time = time.time() - start | |||
elapse_time = time.time() - epoch_start | |||
elapse_time = datetime.timedelta(seconds=elapse_time) | |||
print("From Rank: {}, Training time {}".format(train_rank, elapse_time)) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
===Paralléliser un modèle avec plusieurs GPU=== | |||
Quand un modèle est trop grand pour être contenu dans [[PyTorch/fr#Travailler_avec_un_seul_GPU|un seul GPU]], vous pouvez le diviser en portions et charger chacune sur des GPU distincts. Dans l’exemple suivant, nous reprenons le code des sections précédentes pour illustrer le fonctionnement. Nous divisons un réseau de neurones convolutifs en deux : d’une part les couches convolutionnelles/de regroupement et d'autre part les couches acycliques entièrement connectées. La tâche demande deux GPU, chacun d’eux étant chargé avec une des parts. De plus, nous ajoutons du code pour [https://pytorch.org/docs/stable/pipeline.html?highlight=pipeline paralléliser les pipelines] et ainsi réduire au maximum le temps pendant lequel le deuxième GPU est inactif dans l’attente des résultats du premier. Pour ce faire, nous créons un <code>nn.Module</code> distinct pour chacune des portions du modèle, puis créons une séquence de modules en enveloppant les portions avec <code>nn.Sequential</code>, et ensuite utilisons <code>torch.distributed.pipeline.sync.Pipe</code> pour morceler chacun des lots en entrée et les passer en parallèle aux deux portions du modèle. | |||
{{File | |||
|name=pytorch-modelpar-pipelined-rpc.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # request 2 GPUs | |||
#SBATCH --tasks-per-node=1 | |||
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance | |||
#SBATCH --mem=8G | |||
#SBATCH --time=0:10:00 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision --no-index | |||
# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism | |||
export MASTER_ADDR=$(hostname) | |||
export MASTER_PORT=34567 | |||
echo "starting training..." | |||
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0 | |||
}} | |||
{{File | |||
|name=pytorch-modelpar-pipelined-rpc.py | |||
|lang="python" | |||
|contents= | |||
import time | |||
import torch | |||
import torch.nn as nn | |||
import torch.optim as optim | |||
from torch.distributed.pipeline.sync import Pipe | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, single node model parallelism test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=512, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
args = parser.parse_args() | |||
# Convolutional + pooling part of the model | |||
class ConvPart(nn.Module): | |||
def __init__(self): | |||
super(ConvPart, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.pool(self.relu(self.conv1(x))) | |||
x = self.pool(self.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
return x | |||
# Dense feedforward part of the model | |||
class MLPPart(nn.Module): | |||
def __init__(self): | |||
super(MLPPart, self).__init__() | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.relu(self.fc1(x)) | |||
x = self.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below | |||
part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU | |||
part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU | |||
net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential() | |||
net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism | |||
criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
perf = [] | |||
total_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.to('cuda:0') | |||
targets = targets.to('cuda:1') | |||
# Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them | |||
outputs = net(inputs).local_value() | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
print(f"Loss: {loss.item()}") | |||
batch_time = time.time() - start | |||
images_per_sec = args.batch_size/batch_time | |||
perf.append(images_per_sec) | |||
total_time = time.time() - total_start | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
===Paralléliser modèle et données avec plusieurs GPU=== | |||
Quand un modèle est trop grand pour être contenu dans un seul GPU et que son entraînement doit se faire avec un très grand ensemble de données, le fait de combiner le parallélisme du modèle et celui des données permet d’obtenir une bonne performance. Le principe est simple : le modèle est divisé en portions chacune attribuée à un GPU; le parallélisme des pipelines est fait avec les résultats; puis des copies du processus sont faites et les copies du modèle sont entraînées en parallèle avec des sous-ensembles distincts de l’ensemble de données d’entraînement. | |||
Comme décrit [[PyTorch/fr#Paralléliser_les_données_avec plusieurs_GPU|ci-dessus]], les gradients sont calculés indépendamment dans chacune des copies et agrégés pour modifier toutes les copies de façon synchrone ou asynchrone, dépendant de la méthode. La différence principale ici est que chaque copie du modèle se trouve sur plus d’un GPU. | |||
==== Utiliser Torch RPC et DDP ==== | |||
Toujours avec le même exemple, nous combinons maintenant Torch RPC et DistributedDataParallel pour séparer le modèle en deux portions et entraîner quatre copies du modèle distribuées en parallèle sur deux nœuds. Autrement dit, nous avons deux copies sur deux GPU de chaque nœud. Cependant, un avertissement s’impose : à ce jour, Torch RPC prend en charge la division d’un modèle dans un seul nœud. Pour entraîner un modèle qui dépasse la quantité de mémoire de tous les GPU dans un nœud de calcul, voyez [[PyTorch/fr#DeepSpeed|la section DeepSpeed]]. | |||
{{File | |||
|name=pytorch-model-data-par.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 2 | |||
#SBATCH --gres=gpu:4 # Request 4 GPUs per node | |||
#SBATCH --tasks-per-node=2 # Request one task per MODEL per node | |||
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance | |||
#SBATCH --mem=16G | |||
#SBATCH --time=0:10:00 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load StdEnv/2020 gcc/11.3.0 | |||
module load python # Using Default Python version - Make sure to choose a version that suits your application, python/3.10.2 works with this demo | |||
module load cuda/11.8.0 | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torch torchvision --no-index | |||
export MAIN_NODE=$(hostname) | |||
echo "starting training..." | |||
srun python pytorch-model-data-par.py --init_method tcp://$MAIN_NODE:3456 --world_size $SLURM_NTASKS --batch_size 512 | |||
}} | |||
{{File | |||
|name=pytorch-model-data-par.py | |||
|lang="python" | |||
|contents= | |||
import time | |||
import os | |||
import torch | |||
import torch.nn as nn | |||
import torch.optim as optim | |||
from torch.distributed.pipeline.sync import Pipe | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
import torch.distributed as dist | |||
import torch.utils.data.distributed | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data & model parallel test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--max_epochs', type=int, default=4, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='') | |||
parser.add_argument('--dist-backend', default='mpi', type=str, help='') | |||
parser.add_argument('--world_size', default=1, type=int, help='') | |||
parser.add_argument('--distributed', action='store_true', help='') | |||
def main(): | |||
args = parser.parse_args() | |||
# Convolutional + pooling part of the model | |||
class ConvPart(nn.Module): | |||
def __init__(self): | |||
super(ConvPart, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.pool(self.relu(self.conv1(x))) | |||
x = self.pool(self.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
return x | |||
# Dense feedforward part of the model | |||
class MLPPart(nn.Module): | |||
def __init__(self): | |||
super(MLPPart, self).__init__() | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.relu(self.fc1(x)) | |||
x = self.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
ngpus_per_node = torch.cuda.device_count() | |||
local_rank = int(os.environ.get("SLURM_LOCALID")) | |||
rank = int(os.environ.get("SLURM_NODEID"))*(ngpus_per_node//2) + local_rank # Divide ngpus_per_node by the number of model parts | |||
os.environ['MASTER_ADDR'] = '127.0.0.1' # Each model replica will run its own RPC server to run pipeline parallelism | |||
os.environ['MASTER_PORT'] = str(34567 + local_rank) # Make sure each RPC server starts on a different port | |||
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # Different replicas won't communicate through RPC, but through DDP | |||
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) # Initialize Data Parallelism communications | |||
part1 = ConvPart().cuda(local_rank) # First part of the model goes on the first GPU of each process | |||
part2 = MLPPart().cuda(local_rank + 1) # Second part goes on the second GPU of each process | |||
net = nn.Sequential(part1,part2) | |||
net = Pipe(net, chunks=32, checkpoint="never") | |||
net = torch.nn.parallel.DistributedDataParallel(net) | |||
criterion = nn.CrossEntropyLoss().cuda(local_rank + 1) # Loss function goes on the second GPU of each process | |||
optimizer = optim.SGD(net.parameters(), lr=args.lr) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler) | |||
for epoch in range(args.max_epochs): | |||
train_sampler.set_epoch(epoch) | |||
train(epoch, net, criterion, optimizer, train_loader, rank, local_rank) | |||
def train(epoch, net, criterion, optimizer, train_loader, train_rank, model_rank): | |||
train_loss = 0 | |||
correct = 0 | |||
total = 0 | |||
epoch_start = time.time() | |||
for batch_idx, (inputs, targets) in enumerate(train_loader): | |||
start = time.time() | |||
inputs = inputs.cuda(model_rank) | |||
targets = targets.cuda(model_rank + 1) | |||
outputs = net(inputs).local_value() | |||
loss = criterion(outputs, targets) | |||
optimizer.zero_grad() | |||
loss.backward() | |||
optimizer.step() | |||
print(f"From Rank {train_rank} - Loss: {loss.item()}") | |||
batch_time = time.time() - start | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
===DeepSpeed=== | |||
DeepSpeed est une bibliothèque qui permet d'optimiser l’entraînement de l’apprentissage de modèles ayant des milliards de paramètres à l’échelle. Parfaitement compatible avec PyTorch, DeepSpeed offre des implémentations de nouvelles méthodes d’entraînement distribué qui font un usage efficace de la mémoire en appliquant le concept de [https://arxiv.org/abs/1910.02054 Zero Redundancy Optimizer (ZeRO)]. Avec ZeRO, DeepSpeed peut distribuer le stockage et le traitement des divers éléments d’une tâche d’entraînement (états de l’optimiseur, poids du modèle, gradients et activations) sur plusieurs dispositifs comme les GPU, les CPU, les disques durs locaux et/ou les combinaisons de ces dispositifs. Cette mise en commun des ressources, surtout les ressources de stockage, permet l’entraînement efficace sur plusieurs nœuds de modèles ayant d’énormes quantités de paramètres sans avoir besoin d’écrire le code pour paralléliser le modèle, les pipelines ou les données. Les exemples qui suivent montrent comment profiter de DeepSpeed et des différentes implémentations de ZeRO en utilisant l’interface simple de PyTorch Lightning. | |||
==== ZeRO avec GPU ==== | |||
Dans l’exemple ci-dessous, nous utilisons ZeRO stage 3 pour entraîner un modèle qui utilise un groupe de 4 GPU. Le stage 3 répartit sur les 4 GPU les trois caractéristiques, soit les états de l’optimiseur, les paramètres du modèle et les gradients du modèle. Ceci est plus efficace que le parallélisme pur des données où une copie complète du modèle est chargée sur chacun des GPU. Avec l’optimiseur DeepSpeed <code>FusedAdam</code> plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module <code>cuda/<version></code> où ''version'' correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez. | |||
{{File | |||
|name=deepspeed-stage3.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=32G | |||
#SBATCH --time=0-00:20 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python cuda # CUDA must be loaded if using a DeepSpeed optimizer | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torchvision pytorch-lightning deepspeed --no-index | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job | |||
# If it is, it expects the user to have requested one task per GPU. | |||
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail! | |||
srun python deepspeed-stage3.py --batch_size 256 | |||
}} | |||
{{File | |||
|name=deepspeed-stage3.py | |||
|lang="python" | |||
|contents= | |||
import torch | |||
from torch import nn | |||
import torch.nn.functional as F | |||
import pytorch_lightning as pl | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
from deepspeed.ops.adam import FusedAdam | |||
from pytorch_lightning.strategies import DeepSpeedStrategy | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models deep seed stage 3 test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--max_epochs', type=int, default=2, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
class ConvPart(nn.Module): | |||
def __init__(self): | |||
super(ConvPart, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.pool(self.relu(self.conv1(x))) | |||
x = self.pool(self.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
return x | |||
# Dense feedforward part of the model | |||
class MLPPart(nn.Module): | |||
def __init__(self): | |||
super(MLPPart, self).__init__() | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.relu(self.fc1(x)) | |||
x = self.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
class Net(pl.LightningModule): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv_part = ConvPart() | |||
self.mlp_part = MLPPart() | |||
def configure_sharded_model(self): | |||
self.block = nn.Sequential(self.conv_part, self.mlp_part) | |||
def forward(self, x): | |||
x = self.block(x) | |||
return x | |||
def training_step(self, batch, batch_idx): | |||
x, y = batch | |||
y_hat = self(x) | |||
loss = F.cross_entropy(y_hat, y) | |||
return loss | |||
def configure_optimizers(self): | |||
return FusedAdam(self.parameters()) | |||
net = Net() | |||
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU. | |||
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs | |||
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. | |||
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, | |||
which can cause issues due to updating logs too frequently.""" | |||
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy="deepspeed_stage_3", max_epochs = args.max_epochs) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
trainer.fit(net,train_loader) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
==== ZeRO avec CPU ==== | |||
Dans cet autre exemple, nous utilisons aussi ZeRO stage 3, mais cette fois-ci nous utilisons le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que la mémoire du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU et de plus, les pas de l’optimiseur seront calculés sur le CPU. Pour des raisons pratiques, ce serait comme ajouter 32Go de mémoire additionnelle à la mémoire du GPU. Comme la mémoire du GPU est moins sollicitée, vous pouvez par exemple augmenter la taille de vos lots ou de votre modèle. Avec l’optimiseur DeepSpeed <code>DeepSpeedCPUAdam</code> plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module <code>cuda/<version></code> où ''version'' correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez. | |||
{{File | |||
|name=deepspeed-stage3-offload-cpu.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=32G | |||
#SBATCH --time=0-00:20 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch. | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torchvision pytorch-lightning deepspeed --no-index | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job | |||
# If it is, it expects the user to have requested one task per GPU. | |||
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail! | |||
srun python deepspeed-stage3-offload-cpu.py --batch_size 256 | |||
}} | |||
{{File | |||
|name=deepspeed-stage3-offload-cpu.py | |||
|lang="python" | |||
|contents= | |||
import torch | |||
from torch import nn | |||
import torch.nn.functional as F | |||
import pytorch_lightning as pl | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
from deepspeed.ops.adam import DeepSpeedCPUAdam | |||
from pytorch_lightning.strategies import DeepSpeedStrategy | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--max_epochs', type=int, default=2, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
class ConvPart(nn.Module): | |||
def __init__(self): | |||
super(ConvPart, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.pool(self.relu(self.conv1(x))) | |||
x = self.pool(self.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
return x | |||
# Dense feedforward part of the model | |||
class MLPPart(nn.Module): | |||
def __init__(self): | |||
super(MLPPart, self).__init__() | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.relu(self.fc1(x)) | |||
x = self.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
class Net(pl.LightningModule): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv_part = ConvPart() | |||
self.mlp_part = MLPPart() | |||
def configure_sharded_model(self): | |||
self.block = nn.Sequential(self.conv_part, self.mlp_part) | |||
def forward(self, x): | |||
x = self.block(x) | |||
return x | |||
def training_step(self, batch, batch_idx): | |||
x, y = batch | |||
y_hat = self(x) | |||
loss = F.cross_entropy(y_hat, y) | |||
return loss | |||
def configure_optimizers(self): | |||
return DeepSpeedCPUAdam(self.parameters()) | |||
net = Net() | |||
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU. | |||
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs | |||
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. | |||
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, | |||
which can cause issues due to updating logs too frequently.""" | |||
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy( | |||
stage=3, | |||
offload_optimizer=True, | |||
offload_parameters=True, | |||
), max_epochs = args.max_epochs) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
trainer.fit(net,train_loader) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
====ZeRO avec utilisation de disques NVMe==== | |||
ZeRO stage 3 nous sert encore, cette fois-ci en utilisant le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que l’espace de stockage local du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU. Ici encore, les pas de l’optimiseur seront calculés sur le CPU. De même, pour des raisons pratiques, ce serait comme ajouter à la mémoire du GPU autant d’espace de stockage que sur le disque local, mais par contre nous aurons une forte perte de performance. Cette approche peut être utilisée avec tous les types de stockage, mais elle est à privilégier avec les disques NVMe qui sont plus rapides et ont des temps de réponse plus courts, ce qui compense pour la baisse de performance. | |||
{{File | |||
|name=deepspeed-stage3-offload-nvme.sh | |||
|lang="bash" | |||
|contents= | |||
#!/bin/bash | |||
#SBATCH --nodes 1 | |||
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”. | |||
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel. | |||
#SBATCH --mem=32G | |||
#SBATCH --time=0-00:20 | |||
#SBATCH --output=%N-%j.out | |||
#SBATCH --account=<your account> | |||
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch. | |||
virtualenv --no-download $SLURM_TMPDIR/env | |||
source $SLURM_TMPDIR/env/bin/activate | |||
pip install torchvision pytorch-lightning deepspeed --no-index | |||
export TORCH_NCCL_ASYNC_HANDLING=1 | |||
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job | |||
# If it is, it expects the user to have requested one task per GPU. | |||
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail! | |||
srun python deepspeed-stage3-offload-nvme.py --batch_size 256 | |||
}} | |||
{{File | |||
|name=deepspeed-stage3-offload-nvme.py | |||
|lang="python" | |||
|contents= | |||
import os | |||
import torch | |||
from torch import nn | |||
import torch.nn.functional as F | |||
import pytorch_lightning as pl | |||
import torchvision | |||
import torchvision.transforms as transforms | |||
from torchvision.datasets import CIFAR10 | |||
from torch.utils.data import DataLoader | |||
from deepspeed.ops.adam import DeepSpeedCPUAdam | |||
from pytorch_lightning.strategies import DeepSpeedStrategy | |||
import argparse | |||
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to nvme test') | |||
parser.add_argument('--lr', default=0.1, help='') | |||
parser.add_argument('--max_epochs', type=int, default=2, help='') | |||
parser.add_argument('--batch_size', type=int, default=768, help='') | |||
parser.add_argument('--num_workers', type=int, default=0, help='') | |||
def main(): | |||
print("Starting...") | |||
args = parser.parse_args() | |||
class ConvPart(nn.Module): | |||
def __init__(self): | |||
super(ConvPart, self).__init__() | |||
self.conv1 = nn.Conv2d(3, 6, 5) | |||
self.pool = nn.MaxPool2d(2, 2) | |||
self.conv2 = nn.Conv2d(6, 16, 5) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.pool(self.relu(self.conv1(x))) | |||
x = self.pool(self.relu(self.conv2(x))) | |||
x = x.view(-1, 16 * 5 * 5) | |||
return x | |||
# Dense feedforward part of the model | |||
class MLPPart(nn.Module): | |||
def __init__(self): | |||
super(MLPPart, self).__init__() | |||
self.fc1 = nn.Linear(16 * 5 * 5, 120) | |||
self.fc2 = nn.Linear(120, 84) | |||
self.fc3 = nn.Linear(84, 10) | |||
self.relu = nn.ReLU() | |||
def forward(self, x): | |||
x = self.relu(self.fc1(x)) | |||
x = self.relu(self.fc2(x)) | |||
x = self.fc3(x) | |||
return x | |||
class Net(pl.LightningModule): | |||
def __init__(self): | |||
super(Net, self).__init__() | |||
self.conv_part = ConvPart() | |||
self.mlp_part = MLPPart() | |||
def configure_sharded_model(self): | |||
self.block = nn.Sequential(self.conv_part, self.mlp_part) | |||
def forward(self, x): | |||
x = self.block(x) | |||
return x | |||
def training_step(self, batch, batch_idx): | |||
x, y = batch | |||
y_hat = self(x) | |||
loss = F.cross_entropy(y_hat, y) | |||
return loss | |||
def configure_optimizers(self): | |||
return DeepSpeedCPUAdam(self.parameters()) | |||
net = Net() | |||
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU. | |||
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs | |||
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. | |||
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, | |||
which can cause issues due to updating logs too frequently.""" | |||
local_scratch = os.environ['SLURM_TMPDIR'] # Get path where local storage is mounted | |||
print(f'Offloading to: {local_scratch}') | |||
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy( | |||
stage=3, | |||
offload_optimizer=True, | |||
offload_parameters=True, | |||
remote_device="nvme", | |||
offload_params_device="nvme", | |||
offload_optimizer_device="nvme", | |||
nvme_path="local_scratch", | |||
), max_epochs = args.max_epochs) | |||
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) | |||
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train) | |||
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers) | |||
trainer.fit(net,train_loader) | |||
if __name__=='__main__': | |||
main() | |||
}} | |||
=Créer des points de contrôle= | |||
Peu importe si vous pensez que la durée d'exécution de votre code sera longue ou non, il est bon de prendre l'habitude de créer des points de contrôle pendant l'entraînement. Un point de contrôle est un portrait de votre modèle à un moment précis du processus d'entraînement (après un certain nombre d'itérations ou un certain nombre d'époques) que vous pouvez sauvegarder sur disque et utiliser plus tard. C'est un moyen pratique de diviser les tâches qui devraient être de longue durée en de multiples petites tâches auxquelles l'ordonnanceur peut allouer des ressources plus rapidement. C'est aussi une bonne façon de ne pas perdre le progrès réalisé au cas où des erreurs de code inattendues surviendraient ou que les nœuds ne soient pas disponibles pour quelconque raison. | |||
==Avec PyTorch Lightning== | |||
Nous recommandons d'utiliser le paramètre de rappels (''callbacks parameter'') de la classe <code>Trainer()</code>. Dans l'exemple suivant, on demande à PyTorch de créer un point de contrôle à la fin de chacune des époques d'entraînement. Vérifiez que le chemin où créer le point de contrôle existe. | |||
callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)] | |||
trainer = pl.Trainer(callbacks=callbacks) | |||
trainer.fit(model) | |||
Ce bout de code chargera un point de contrôle de <code>./ckpt</code> (s'il en existe) et poursuivra l'entraînement à partir de ce point. Pour plus d'information, consultez la [https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.callbacks.model_checkpoint.html documentation PyTorch Lightning]. | |||
==Avec des boucles d'entraînement personnalisées== | |||
Pour des exemples, consultez [https://pytorch.org/tutorials/recipes/recipes/saving_and_loading_a_general_checkpoint.html la documentation PyTorch]. | |||
== Pendant l’entraînement distribué == | |||
Les points de contrôle peuvent être utilisés pendant l’exécution d’un programme d’entraînement distribué. Avec PyTorch Lightning, aucun code supplémentaire n’est requis, autre que d’insérer le paramètre de rappels (''callbacks parameter'') mentionné ci-dessus. Cependant, si vous utilisez DistributedDataParallel ou Horovod, les points de contrôle devront être créés par un seul processus (''rank'') de votre programme puisque tous les processus auront le même état après chaque itération. Dans cet exemple, le premier processus (''rank 0'') crée un point de contrôle. | |||
if global_rank == 0: | |||
torch.save(ddp_model.state_dict(), "./checkpoint_path") | |||
Faites attention aux points de contrôle ainsi créés. Si un processus tente de charger un point de contrôle qui n’a pas encore été sauvegardé par un autre, des erreurs peuvent survenir ou de mauvais résultats peuvent être produits. Pour éviter ceci, vous pouvez ajouter une barrière à votre code pour faire en sorte que le processus qui crée le point de contrôle a terminé son écriture sur le disque avant que d’autres processus tentent de le charger. Remarquez aussi que <code>torch.load</code> essaiera par défaut de charger les tenseurs sur le GPU sur lequel ils étaient initialement sauvegardés, dans notre cas <code>cuda:0</code>. Pour éviter les problèmes, passez <code>map_location</code> à <code>torch.load</code> pour charger les tenseurs sur le GPU identifié par chaque processus. | |||
torch.distributed.barrier() | |||
map_location = f"cuda:{local_rank}" | |||
ddp_model.load_state_dict( | |||
torch.load("./checkpoint_path", map_location=map_location)) | |||
<!-- this section is hidden until results are verified. | <!-- this section is hidden until results are verified. | ||
Line 100: | Line 1,835: | ||
== Fuites de mémoire == | == Fuites de mémoire == | ||
Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de < | Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de <code>torch</code>. | ||
== c10::Error == | |||
Dans certains cas, vous pouvez obtenir un erreur comme | |||
terminate called after throwing an instance of 'c10::Error' | |||
what(): Given groups=1, weight of size [256, 1, 3, 3], expected input[16, 10, 16, 16] to have 1 channels, but got 10 channels instead | |||
Exception raised from check_shape_forward at /tmp/coulombc/pytorch_build_2021-11-09_14-57-01/avx2/python3.8/pytorch/aten/src/ATen/native/Convolution.cpp:496 (most recent call first): | |||
... | |||
Une exception C++ est émise plutôt qu'une exception Python. Ceci peut se produire quand vous programmez en C++ avec libtorch, mais ne devrait pas se produire quand vous programmez en Python. Il n'est pas possible de suivre la trace des appels (''traceback'')du programme Python, ce qui ne permet pas d'identifier facilement la cause de l'erreur dans le script Python. Nous avons constaté que le fait d'utiliser PyTorch 1.9.1 plutôt que 1.10.x permet le ''traceback'' du programme Python. | |||
= LibTorch = | = LibTorch = | ||
LibTorch | LibTorch permet d'implémenter à PyTorch des extensions C++ et des <b>applications d'apprentissage machine en C++ pur</b>. La distribution LibTorch possède les en-têtes, bibliothèques et fichiers de configuration CMake nécessaires pour travailler avec PyTorch, tel que décrit dans la [https://pytorch.org/cppdocs/installing.html documentation]. | ||
=== | |||
=== Utiliser LibTorch === | |||
==== Configurer l'environnement ==== | |||
Chargez les modules requis par LibTorch, puis installez PyTorch dans un environnement virtuel Python. | |||
< | <tabs> | ||
<tab name="StdEnv/2023"> | |||
module load StdEnv/2023 gcc cuda/12.2 cmake protobuf cudnn python/3.11 abseil cusparselt opencv/4.8.1 | |||
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate | |||
pip install --no-index torch numpy | |||
Vous devrez peut-être ajuster les versions des modules abseil, cusparselt et opencv, dépendant du paquet torch que vous utilisez. Pour savoir quelle version d'un module a été utilisée pour compiler le wheel Python, lancez la commande | |||
{{Command | |||
|prompt=$ | |||
|ldd $VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib/libtorch_cuda.so {{!}} sed -n 's&^.*/\(\(opencv\{{!}}abseil\{{!}}cusparselt\)/[^/]*\).*&\1&p' {{!}} sort -u | |||
|result= | |||
abseil/20230125.3 | |||
cusparselt/0.5.0.1 | |||
opencv/4.8.1 | |||
}} | |||
</tab> | |||
<tab name="StdEnv/2020"> | |||
module load gcc cuda/11.4 cmake protobuf cudnn python/3.10 | |||
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate | |||
pip install --no-index torch numpy | |||
</tab> | |||
</tabs> | |||
==== | ==== Compiler un exemple simple ==== | ||
Créez les deux fichiers suivants : | |||
{{File | {{File | ||
|name=example | |name=example.cpp | ||
|lang="cpp" | |lang="cpp" | ||
|contents= | |contents= | ||
Line 134: | Line 1,893: | ||
#include <iostream> | #include <iostream> | ||
int main() { | int main() | ||
{ | |||
torch::Device device(torch::kCPU); | torch::Device device(torch::kCPU); | ||
if (torch::cuda::is_available()) { | if (torch::cuda::is_available()) | ||
{ | |||
std::cout << "CUDA is available! Using GPU." << std::endl; | std::cout << "CUDA is available! Using GPU." << std::endl; | ||
device = torch::Device(torch::kCUDA); | device = torch::Device(torch::kCUDA); | ||
Line 151: | Line 1,912: | ||
|contents= | |contents= | ||
cmake_minimum_required(VERSION 3.0 FATAL_ERROR) | cmake_minimum_required(VERSION 3.0 FATAL_ERROR) | ||
project(example | project(example) | ||
find_package(Torch REQUIRED) | find_package(Torch REQUIRED) | ||
add_executable(example | add_executable(example example.cpp) | ||
target_link_libraries(example | target_link_libraries(example "${TORCH_LIBRARIES}") | ||
set_property(TARGET example | set_property(TARGET example PROPERTY CXX_STANDARD 14) | ||
}} | }} | ||
Activez l'environnement virtuel Python, configurez le projet et compilez le programme. | |||
<tabs> | |||
< | <tab name="StdEnv/2023"> | ||
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.11/site-packages \ | |||
-DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib,-L$EBROOTCUDA/extras/CUPTI/lib64 \ | |||
-DCMAKE_SKIP_RPATH=ON -DTORCH_CUDA_ARCH_LIST="6.0;7.0;7.5;8.0;9.0" | |||
cmake --build build | |||
</tab> | |||
< | <tab name="StdEnv/2020"> | ||
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.10/site-packages \ | |||
-DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.10/site-packages/torch/lib \ | |||
-DCMAKE_SKIP_RPATH=ON | |||
cmake --build build | |||
</ | </tab> | ||
</tabs> | |||
Lancez le programme avec | |||
build/example | |||
Pour tester une application avec CUDA, demandez une [[Running_jobs/fr#Tâches_interactives|tâche interactive]] avec [[Using_GPUs_with_Slurm/fr|GPU]]. | |||
= | = Ressources = | ||
https://pytorch.org/cppdocs/ | https://pytorch.org/cppdocs/ |
Latest revision as of 17:15, 2 October 2024
PyTorch est un paquet Python qui offre deux fonctionnalités de haut niveau :
- le calcul tensoriel (semblable à celui effectué par NumPy) avec forte accélération de GPU,
- des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone.
Si vous voulez porter un programme PyTorch sur une de nos grappes, il serait bon de prendre connaissance de ce tutoriel.
Clarification
Il y a une certaine ressemblance entre PyTorch et Torch, mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents.
Les développeurs PyTorch offrent aussi LibTorch qui permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. Les modèles Python écrits avec PyTorch peuvent être convertis et utilisés en C++ avec TorchScript.
Installation
Wheels récemment ajoutés
Pour connaître la dernière version de PyTorch, utilisez
[name@server ~]$ avail_wheels "torch*"
Pour plus d'information, voyez Wheels disponibles.
Installation du wheel
La meilleure option est d'installer avec Python wheels comme suit :
- 1. Chargez un module Python avec
module load python
. - 2. Créez et démarrez un environnement virtuel.
- 3. Installez PyTorch dans l'environnement virtuel avec
pip install
.
- 1. Chargez un module Python avec
GPU et CPU
-
(venv) [name@server ~] pip install --no-index torch
Remarque : PyTorch 1.10 cause des problèmes connus sur nos grappes (à l'exception de Narval). Si l'entraînement distribué produit des erreurs ou si vous obtenez une erreur qui inclut c10::Error
, nous vous recommandons d'installer PyTorch 1.9.1 avec pip install --no-index torch==1.9.1
.
En supplément
En plus de torch
, vous pouvez aussi installer torchvision
, torchtext
et torchaudio
.
(venv) [name@server ~] pip install --no-index torch torchvision torchtext torchaudio
Soumettre une tâche
Le script suivant est un exemple de soumission d'une tâche utilisant le wheel Python avec un environnement virtuel.
#!/bin/bash
#SBATCH --gres=gpu:1 # Request GPU "generic resources"
#SBATCH --cpus-per-task=6 # Cores proportional to GPUs: 6 on Cedar, 16 on Graham.
#SBATCH --mem=32000M # Memory proportional to GPUs: 32000 Cedar, 64000 Graham.
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python/<select version> # Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch --no-index
python pytorch-test.py
Le script Python pytorch-ddp-test.py
a la forme suivante :
import torch
x = torch.Tensor(5, 3)
print(x)
y = torch.rand(5, 3)
print(y)
# let us run the following only if CUDA is available
if torch.cuda.is_available():
x = x.cuda()
y = y.cuda()
print(x + y)
Vous pouvez alors soumettre une tâche PyTorch avec
[name@server ~]$ sbatch pytorch-test.sh
Haute performance
TF32 : Performance vs précision
Avec sa version 1.7.0, PyTorch a ajouté le support pour le mode TensorFloat-32 (TF32) de Nvidia et est seulement disponible pour les architectures GPU d'Ampere et de Nvidia. Avec ce mode qui est offert par défaut dans les versions 1.7.x à 1.11.x, les opérations tensorielles se font jusqu'à 20x plus rapidement que les opérations équivalentes en simple précision (FP32). Cependant, ce gain en performance peut engendrer une baisse dans la précision du résultat des opérations, ce qui pose problème avec les modèles d'apprentissage profond qui utilisent à l'occasion des matrices mal conditionnées ou qui effectuent de longues séquences d'opérations tensorielles. Suite aux commentaires de la communauté des utilisateurs, TF32 est désactivé par défaut pour les multiplications matricielle et activé par défaut pour les convolutions à partir de la version 1.12.0.
En date d'octobre 2022, notre seule grappe qui offre des GPU Ampere est Narval. Quand vous utilisez PyTorch sur Narval,
- Vous pourriez remarquer un fort ralentissement dans l'exécution sur GPU du même code avec
torch < 1.12.0
ettorch >= 1.12.0
. - Vous pourriez obtenir des résultats différents dans l'exécution sur GPU du même code avec
torch < 1.12.0
ettorch >= 1.12.0
.
Pour activer ou désactiver TF32 pour torch >= 1.12.0
, donnez la valeur True
ou False
aux indicateurs suivants :
torch.backends.cuda.matmul.allow_tf32 = False # Enable/disable TF32 for matrix multiplications torch.backends.cudnn.allow_tf32 = False # Enable/disable TF32 for convolutions
Pour plus d'information, consultez ce paragraphe de la documentation PyTorch.
Travailler avec plusieurs CPU
Par défaut, PyTorch permet le parallélisme avec plusieurs CPU de deux façons :
- intra-op, par l’implémentation parallèle d’opérateurs souvent utilisés en apprentissage profond comme le produit matriciel ou le produit de convolution, en utilisant OpenMP directement ou avec des bibliothèques de bas niveau comme MKL et OneDNN. Quand du code PyTorch doit effectuer de telles opérations, elles utilisent automatiquement de multiples fils avec tous les cœurs CPU disponibles.
- inter-op, par la capacité d’exécuter différentes parties de code de manière concurrente. Ce mode de parallélisme nécessite habituellement que le programme soit conçu de manière à exécuter plusieurs parties en parallèle, par exemple en faisant usage du compilateur en temps réel torch.jit pour exécuter des tâches asynchrones dans un programme TorchScript.
Pour les petits modèles, nous recommandons fortement d’utiliser plusieurs CPU plutôt qu’un GPU. L’entraînement sera certainement plus rapide avec un GPU (sauf dans les cas de très petits modèles), mais si le modèle et le jeu de données ne sont pas assez grands, la vitesse gagnée avec le GPU ne sera probablement pas très importante et la tâche n’utilisera qu’une petite part de la capacité de calcul. Ce n’est peut-être pas grave sur votre propre ordinateur, mais dans un environnement partagé comme sur nos grappes, vous bloqueriez une ressource qui pourrait servir à effectuer de calculs de grande échelle par un autre projet. De plus, l’utilisation d’un GPU contribuerait à la diminution de l’allocation de votre groupe et aurait une incidence sur la priorité accordée aux tâches de vos collègues.
Dans le code suivant, il y a plusieurs occasions d’utiliser le parallélisme intra-op. En demandant plus de CPU et sans changer le code, on peut constater l’effet sur la performance.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-cpu.py
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
import os
parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
torch.set_num_threads(int(os.environ['SLURM_CPUS_PER_TASK']))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
### This next line will attempt to download the CIFAR10 dataset from the internet if you don't already have it stored in ./data
### Run this line on a login node with "download=True" prior to submitting your job, or manually download the data from
### https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and place it under ./data
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Travailler avec un seul GPU
On entend souvent dire qu’il faut absolument entraîner un modèle avec un GPU s’il y en a un à notre disposition. Ceci est presque toujours vrai (l'entraînement de très petits modèles est souvent plus rapide avec un ou plusieurs CPU) sur un poste de travail local, mais ce n’est pas le cas sur nos grappes.
Autrement dit, vous ne devriez pas demander un GPU si votre code ne peut pas faire un usage raisonnable de sa capacité de calcul.
La performance avantageuse des GPU pour les tâches d’apprentissage profond provient de deux sources :
- La capacité de paralléliser l’exécution de certaines opérations clés, par exemple le multiplieur-accumulateur, sur plusieurs milliers de cœurs de calcul, en comparaison du très petit nombre de cœurs disponibles avec la plupart des CPU.
- Une bande passante de mémoire beaucoup plus grande que pour un CPU, ce qui permet aux GPU d’utiliser efficacement leur très grand nombre de cœurs pour traiter une plus grande quantité de données par cycle de calcul.
Comme c’est le cas avec plusieurs CPU, PyTorch offre des implémentations parallèles d’opérateurs souvent utilisés en apprentissage profond, comme le produit matriciel et le produit de convolution et utilise des bibliothèques spécialisées pour les GPU comme CUDNN ou MIOpen, selon la plateforme matérielle. Ceci signifie que pour qu’il vaille la peine d’utiliser un GPU pour une tâche d’apprentissage, elle doit être composée d’éléments qui peuvent être élargis à une application massive du parallélisme de par le nombre d’opérations pouvant être parallélisées, de par la quantité des données à traiter ou idéalement de par les deux. Un exemple concret serait un grand modèle qui a un grand nombre d’unités et de couches ou qui a beaucoup de données en entrée, et idéalement qui présente ces deux caractéristiques.
Dans l’exemple ci-dessous, nous adaptons le code de la section précédente pour utiliser un GPU et nous examinons la performance. Nous observons que deux paramètres jouent un rôle important : batch_size
et num_workers
. Le premier paramètre améliore la performance en augmentant la taille des entrées à chaque itération et en utilisant mieux la capacité du GPU. Dans le cas du second paramètre, la performance est améliorée en facilitant le mouvement des données entre la mémoire de l’hôte (le CPU) et la mémoire du GPU, ce qui réduit la durée d’inactivité du GPU en attente de données à traiter.
Nous pouvons tirer deux conclusions :
- Augmenter la valeur de
batch_size
au maximum qu’il est possible pour la mémoire du GPU optimise la performance. - Utiliser un
DataLoader
avec autant de workers quecpus-per-task
facilite l’apport de données au GPU.
Bien entendu, le paramètre batch_size
a aussi un impact sur la performance d’un modèle dans une tâche(c.à.d. l’exactitude, l’erreur, etc.) et il existe différentes écoles de pensée sur l’utilisation de grands lots. Nous n’abordons pas le sujet ici, mais si vous croyez qu’un petit lot conviendrait mieux à votre application, allez à la section Travailler avec un seul GPU pour savoir comment maximiser l’utilisation du GPU avec de petites entrées de données.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-gpu.py --batch_size=512 --num_workers=0
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net().cuda() # Load model on the GPU
criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Parallélisme des données avec un seul GPU
Il n’est pas conseillé d’utiliser un GPU avec un modèle de taille relativement petite qui n’utilise pas une grande part de la mémoire du GPU et une part raisonnable de sa capacité de calcul; utilisez plutôt un ou plusieurs CPU. Par contre, profiter du parallélisme du GPU devient une bonne option si vous avez un tel modèle avec un très grand jeu de données et que vous voulez effectuer l’entraînement avec des lots de petite taille.
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit un morceau des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges.
PyTorch offre des implémentations de méthodes de parallélisme des données, la classe DistributedDataParallel
étant celle recommandée par les développeurs de PyTorch pour donner la meilleure performance. Conçue pour le travail avec plusieurs GPU, elle peut aussi être employée avec un seul GPU.
Dans l’exemple ci-dessous, nous adaptons le code pour un seul GPU pour utiliser le parallélisme des données. La tâche est relativement petite; la taille du lot est de 512 images, le modèle occupe environ 1Go de la mémoire du GPU et l’entraînement n’utilise qu’environ 6 % de sa capacité de calcul. Ce modèle ne devrait pas être entraîné sur un GPU sur nos grappes. Cependant, en parallélisant les données, un GPU V100 avec 16Go de mémoire peut contenir 14 ou 15 copies du modèle et augmenter l'utilisation de la ressource en plus d’obtenir une bonne augmentation de vitesse. Nous utilisons Multi-Process Service (MPS) de NVIDIA avec MPI pour placer plusieurs copies du modèle sur un GPU de façon efficace.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# Activate Nvidia MPS:
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log
nvidia-cuda-mps-control -d
echo "starting training..."
time srun --cpus-per-task=$SLURM_CPUS_PER_TASK python cifar10-gpu-mps.py --batch_size=512 --num_workers=0
import os
import time
import datetime
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
def main():
print("Starting...")
args = parser.parse_args()
rank = os.environ.get("SLURM_LOCALID")
current_device = 0
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes that will run a model replica """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check.
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr)
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Travailler avec plusieurs GPU
Problème avec DistributedDataParallel et PyTorch 1.10
Avec notre wheel PyTorch 1.10 torch-1.10.0+computecanada
, le code pour travailler avec plusieurs GPU et qui utilise
DistributedDataParallel pourrait échouer de façon imprévisible si le backend est défini comme étant 'nccl'
ou 'gloo'
. Nous vous recommandons d'utiliser la version la plus récente de PyTorch plutôt que la version 1.10 sur toutes les grappes d'usage général.
Paralléliser les données avec plusieurs GPU
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit une portion des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges. Quand plusieurs GPU sont utilisés, chacun reçoit une copie du modèle; il doit donc être assez petit pour être contenu dans la mémoire d’un GPU. Pour entraîner un modèle qui dépasse la quantité de mémoire d’un GPU, voyez la section Paralléliser un modèle avec plusieurs GPU.
Il existe plusieurs manières de paralléliser les données avec PyTorch. Nous présentons ici des tutoriels avec la classe DistributedDataParallel, avec le paquet PyTorch Lightning et avec le paquet Horovod.
DistributedDataParallel
Avec plusieurs GPU, la classe DistributedDataParallel est recommandée par les développeurs PyTorch, que ce soit avec un nœud unique ou avec plusieurs nœuds. Dans le cas qui suit, plusieurs GPU sont distribués sur deux nœuds.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
srun -N $SLURM_NNODES -n $SLURM_NNODES bash << EOF
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision --no-index
EOF
export TORCH_NCCL_ASYNC_HANDLING=1
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.
echo "r$SLURM_NODEID master: $MASTER_ADDR"
echo "r$SLURM_NODEID Launching python script"
# The $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times
source $SLURM_TMPDIR/env/bin/activate
srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) --batch_size 256
Le script Python pytorch-ddp-test.py
a la forme suivante :
import os
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='gloo', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
print("Starting...")
args = parser.parse_args()
ngpus_per_node = torch.cuda.device_count()
""" This next line is the key to getting DistributedDataParallel working on SLURM:
SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the
current process inside a node and is also 0 or 1 in this example."""
local_rank = int(os.environ.get("SLURM_LOCALID"))
rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank
current_device = local_rank
torch.cuda.set_device(current_device)
""" this block initializes a process group and initiate communications
between all processes running on all nodes """
print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
#init the process group
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank)
print("process group ready!")
print('From Rank: {}, ==> Making model..'.format(rank))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device])
print('From Rank: {}, ==> Preparing data..'.format(rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(epoch, net, criterion, optimizer, train_loader, rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
PyTorch Lightning
Ce paquet fournit des interfaces à PyTorch afin de simplifier plusieurs tâches communes exigeant beaucoup de code; ceci inclut les tâches d'entraînement de modèles avec plusieurs GPU. Dans le tutoriel suivant pour PyTorch Lightning, nous reprenons le même exemple que ci-dessus, mais sans avoir explicitement recours à la classe DistributedDataParallel.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python pytorch-ddp-test-pl.py --batch_size 256
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=args.lr)
net = Net()
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPUs per node.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy='ddp', max_epochs = args.max_epochs, enable_progress_bar=False)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
Horovod
Horovod est une plateforme d'entraînement distribué pour l'apprentissage profond, compatible avec TensorFlow, Keras, PyTorch et Apache MXNet. Son API vous permet d'avoir le même niveau de contrôle sur votre code d'entraînement qu'avec DistributedDataParallel
, mais simplifie l'écriture de vos scripts en éliminant le besoin de configurer directement les groupes de processus et en prenant en charge les variables d'environnement de l'ordonnanceur. Horovod offre aussi des optimiseurs distribués qui dans certains cas améliorent la performance. L'exemple suivant est le même que le précédent, cette fois avec Horovod.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision horovod --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
srun python pytorch_horovod.py --batch_size 256
import os
import time
import datetime
import numpy as np
import horovod.torch as hvd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--max_epochs', type=int, default=1, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
hvd.init()
print("Starting...")
local_rank = hvd.local_rank()
global_rank = hvd.rank()
torch.cuda.set_device(local_rank)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
net = Net()
net.cuda()
print('From Rank: {}, ==> Preparing data..'.format(global_rank))
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
criterion = nn.CrossEntropyLoss().cuda()
optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())
hvd.broadcast_parameters(net.state_dict(), root_rank=0)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(args,epoch, net, criterion, optimizer, train_loader, global_rank)
def train(args,epoch, net, criterion, optimizer, train_loader, train_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda()
targets = targets.cuda()
outputs = net(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
acc = 100 * correct / total
batch_time = time.time() - start
elapse_time = time.time() - epoch_start
elapse_time = datetime.timedelta(seconds=elapse_time)
print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
main()
Paralléliser un modèle avec plusieurs GPU
Quand un modèle est trop grand pour être contenu dans un seul GPU, vous pouvez le diviser en portions et charger chacune sur des GPU distincts. Dans l’exemple suivant, nous reprenons le code des sections précédentes pour illustrer le fonctionnement. Nous divisons un réseau de neurones convolutifs en deux : d’une part les couches convolutionnelles/de regroupement et d'autre part les couches acycliques entièrement connectées. La tâche demande deux GPU, chacun d’eux étant chargé avec une des parts. De plus, nous ajoutons du code pour paralléliser les pipelines et ainsi réduire au maximum le temps pendant lequel le deuxième GPU est inactif dans l’attente des résultats du premier. Pour ce faire, nous créons un nn.Module
distinct pour chacune des portions du modèle, puis créons une séquence de modules en enveloppant les portions avec nn.Sequential
, et ensuite utilisons torch.distributed.pipeline.sync.Pipe
pour morceler chacun des lots en entrée et les passer en parallèle aux deux portions du modèle.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # request 2 GPUs
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism
export MASTER_ADDR=$(hostname)
export MASTER_PORT=34567
echo "starting training..."
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single node model parallelism test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
args = parser.parse_args()
# Convolutional + pooling part of the model
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below
part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU
part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU
net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential()
net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism
criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
perf = []
total_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.to('cuda:0')
targets = targets.to('cuda:1')
# Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them
outputs = net(inputs).local_value()
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Loss: {loss.item()}")
batch_time = time.time() - start
images_per_sec = args.batch_size/batch_time
perf.append(images_per_sec)
total_time = time.time() - total_start
if __name__=='__main__':
main()
Paralléliser modèle et données avec plusieurs GPU
Quand un modèle est trop grand pour être contenu dans un seul GPU et que son entraînement doit se faire avec un très grand ensemble de données, le fait de combiner le parallélisme du modèle et celui des données permet d’obtenir une bonne performance. Le principe est simple : le modèle est divisé en portions chacune attribuée à un GPU; le parallélisme des pipelines est fait avec les résultats; puis des copies du processus sont faites et les copies du modèle sont entraînées en parallèle avec des sous-ensembles distincts de l’ensemble de données d’entraînement. Comme décrit ci-dessus, les gradients sont calculés indépendamment dans chacune des copies et agrégés pour modifier toutes les copies de façon synchrone ou asynchrone, dépendant de la méthode. La différence principale ici est que chaque copie du modèle se trouve sur plus d’un GPU.
Utiliser Torch RPC et DDP
Toujours avec le même exemple, nous combinons maintenant Torch RPC et DistributedDataParallel pour séparer le modèle en deux portions et entraîner quatre copies du modèle distribuées en parallèle sur deux nœuds. Autrement dit, nous avons deux copies sur deux GPU de chaque nœud. Cependant, un avertissement s’impose : à ce jour, Torch RPC prend en charge la division d’un modèle dans un seul nœud. Pour entraîner un modèle qui dépasse la quantité de mémoire de tous les GPU dans un nœud de calcul, voyez la section DeepSpeed.
#!/bin/bash
#SBATCH --nodes 2
#SBATCH --gres=gpu:4 # Request 4 GPUs per node
#SBATCH --tasks-per-node=2 # Request one task per MODEL per node
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=16G
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load StdEnv/2020 gcc/11.3.0
module load python # Using Default Python version - Make sure to choose a version that suits your application, python/3.10.2 works with this demo
module load cuda/11.8.0
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
export MAIN_NODE=$(hostname)
echo "starting training..."
srun python pytorch-model-data-par.py --init_method tcp://$MAIN_NODE:3456 --world_size $SLURM_NTASKS --batch_size 512
import time
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data & model parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='mpi', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
args = parser.parse_args()
# Convolutional + pooling part of the model
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
ngpus_per_node = torch.cuda.device_count()
local_rank = int(os.environ.get("SLURM_LOCALID"))
rank = int(os.environ.get("SLURM_NODEID"))*(ngpus_per_node//2) + local_rank # Divide ngpus_per_node by the number of model parts
os.environ['MASTER_ADDR'] = '127.0.0.1' # Each model replica will run its own RPC server to run pipeline parallelism
os.environ['MASTER_PORT'] = str(34567 + local_rank) # Make sure each RPC server starts on a different port
torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # Different replicas won't communicate through RPC, but through DDP
dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) # Initialize Data Parallelism communications
part1 = ConvPart().cuda(local_rank) # First part of the model goes on the first GPU of each process
part2 = MLPPart().cuda(local_rank + 1) # Second part goes on the second GPU of each process
net = nn.Sequential(part1,part2)
net = Pipe(net, chunks=32, checkpoint="never")
net = torch.nn.parallel.DistributedDataParallel(net)
criterion = nn.CrossEntropyLoss().cuda(local_rank + 1) # Loss function goes on the second GPU of each process
optimizer = optim.SGD(net.parameters(), lr=args.lr)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
for epoch in range(args.max_epochs):
train_sampler.set_epoch(epoch)
train(epoch, net, criterion, optimizer, train_loader, rank, local_rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank, model_rank):
train_loss = 0
correct = 0
total = 0
epoch_start = time.time()
for batch_idx, (inputs, targets) in enumerate(train_loader):
start = time.time()
inputs = inputs.cuda(model_rank)
targets = targets.cuda(model_rank + 1)
outputs = net(inputs).local_value()
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"From Rank {train_rank} - Loss: {loss.item()}")
batch_time = time.time() - start
if __name__=='__main__':
main()
DeepSpeed
DeepSpeed est une bibliothèque qui permet d'optimiser l’entraînement de l’apprentissage de modèles ayant des milliards de paramètres à l’échelle. Parfaitement compatible avec PyTorch, DeepSpeed offre des implémentations de nouvelles méthodes d’entraînement distribué qui font un usage efficace de la mémoire en appliquant le concept de Zero Redundancy Optimizer (ZeRO). Avec ZeRO, DeepSpeed peut distribuer le stockage et le traitement des divers éléments d’une tâche d’entraînement (états de l’optimiseur, poids du modèle, gradients et activations) sur plusieurs dispositifs comme les GPU, les CPU, les disques durs locaux et/ou les combinaisons de ces dispositifs. Cette mise en commun des ressources, surtout les ressources de stockage, permet l’entraînement efficace sur plusieurs nœuds de modèles ayant d’énormes quantités de paramètres sans avoir besoin d’écrire le code pour paralléliser le modèle, les pipelines ou les données. Les exemples qui suivent montrent comment profiter de DeepSpeed et des différentes implémentations de ZeRO en utilisant l’interface simple de PyTorch Lightning.
ZeRO avec GPU
Dans l’exemple ci-dessous, nous utilisons ZeRO stage 3 pour entraîner un modèle qui utilise un groupe de 4 GPU. Le stage 3 répartit sur les 4 GPU les trois caractéristiques, soit les états de l’optimiseur, les paramètres du modèle et les gradients du modèle. Ceci est plus efficace que le parallélisme pur des données où une copie complète du modèle est chargée sur chacun des GPU. Avec l’optimiseur DeepSpeed FusedAdam
plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>
où version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using a DeepSpeed optimizer
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3.py --batch_size 256
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import FusedAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models deep seed stage 3 test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return FusedAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy="deepspeed_stage_3", max_epochs = args.max_epochs)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
ZeRO avec CPU
Dans cet autre exemple, nous utilisons aussi ZeRO stage 3, mais cette fois-ci nous utilisons le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que la mémoire du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU et de plus, les pas de l’optimiseur seront calculés sur le CPU. Pour des raisons pratiques, ce serait comme ajouter 32Go de mémoire additionnelle à la mémoire du GPU. Comme la mémoire du GPU est moins sollicitée, vous pouvez par exemple augmenter la taille de vos lots ou de votre modèle. Avec l’optimiseur DeepSpeed DeepSpeedCPUAdam
plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>
où version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-cpu.py --batch_size 256
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return DeepSpeedCPUAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
stage=3,
offload_optimizer=True,
offload_parameters=True,
), max_epochs = args.max_epochs)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
ZeRO avec utilisation de disques NVMe
ZeRO stage 3 nous sert encore, cette fois-ci en utilisant le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que l’espace de stockage local du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU. Ici encore, les pas de l’optimiseur seront calculés sur le CPU. De même, pour des raisons pratiques, ce serait comme ajouter à la mémoire du GPU autant d’espace de stockage que sur le disque local, mais par contre nous aurons une forte perte de performance. Cette approche peut être utilisée avec tous les types de stockage, mais elle est à privilégier avec les disques NVMe qui sont plus rapides et ont des temps de réponse plus courts, ce qui compense pour la baisse de performance.
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2 # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-nvme.py --batch_size 256
import os
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to nvme test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
print("Starting...")
args = parser.parse_args()
class ConvPart(nn.Module):
def __init__(self):
super(ConvPart, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.relu = nn.ReLU()
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
return x
# Dense feedforward part of the model
class MLPPart(nn.Module):
def __init__(self):
super(MLPPart, self).__init__()
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()
self.conv_part = ConvPart()
self.mlp_part = MLPPart()
def configure_sharded_model(self):
self.block = nn.Sequential(self.conv_part, self.mlp_part)
def forward(self, x):
x = self.block(x)
return x
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = F.cross_entropy(y_hat, y)
return loss
def configure_optimizers(self):
return DeepSpeedCPUAdam(self.parameters())
net = Net()
""" Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
which can cause issues due to updating logs too frequently."""
local_scratch = os.environ['SLURM_TMPDIR'] # Get path where local storage is mounted
print(f'Offloading to: {local_scratch}')
trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
stage=3,
offload_optimizer=True,
offload_parameters=True,
remote_device="nvme",
offload_params_device="nvme",
offload_optimizer_device="nvme",
nvme_path="local_scratch",
), max_epochs = args.max_epochs)
transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
trainer.fit(net,train_loader)
if __name__=='__main__':
main()
Créer des points de contrôle
Peu importe si vous pensez que la durée d'exécution de votre code sera longue ou non, il est bon de prendre l'habitude de créer des points de contrôle pendant l'entraînement. Un point de contrôle est un portrait de votre modèle à un moment précis du processus d'entraînement (après un certain nombre d'itérations ou un certain nombre d'époques) que vous pouvez sauvegarder sur disque et utiliser plus tard. C'est un moyen pratique de diviser les tâches qui devraient être de longue durée en de multiples petites tâches auxquelles l'ordonnanceur peut allouer des ressources plus rapidement. C'est aussi une bonne façon de ne pas perdre le progrès réalisé au cas où des erreurs de code inattendues surviendraient ou que les nœuds ne soient pas disponibles pour quelconque raison.
Avec PyTorch Lightning
Nous recommandons d'utiliser le paramètre de rappels (callbacks parameter) de la classe Trainer()
. Dans l'exemple suivant, on demande à PyTorch de créer un point de contrôle à la fin de chacune des époques d'entraînement. Vérifiez que le chemin où créer le point de contrôle existe.
callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)] trainer = pl.Trainer(callbacks=callbacks) trainer.fit(model)
Ce bout de code chargera un point de contrôle de ./ckpt
(s'il en existe) et poursuivra l'entraînement à partir de ce point. Pour plus d'information, consultez la documentation PyTorch Lightning.
Avec des boucles d'entraînement personnalisées
Pour des exemples, consultez la documentation PyTorch.
Pendant l’entraînement distribué
Les points de contrôle peuvent être utilisés pendant l’exécution d’un programme d’entraînement distribué. Avec PyTorch Lightning, aucun code supplémentaire n’est requis, autre que d’insérer le paramètre de rappels (callbacks parameter) mentionné ci-dessus. Cependant, si vous utilisez DistributedDataParallel ou Horovod, les points de contrôle devront être créés par un seul processus (rank) de votre programme puisque tous les processus auront le même état après chaque itération. Dans cet exemple, le premier processus (rank 0) crée un point de contrôle.
if global_rank == 0: torch.save(ddp_model.state_dict(), "./checkpoint_path")
Faites attention aux points de contrôle ainsi créés. Si un processus tente de charger un point de contrôle qui n’a pas encore été sauvegardé par un autre, des erreurs peuvent survenir ou de mauvais résultats peuvent être produits. Pour éviter ceci, vous pouvez ajouter une barrière à votre code pour faire en sorte que le processus qui crée le point de contrôle a terminé son écriture sur le disque avant que d’autres processus tentent de le charger. Remarquez aussi que torch.load
essaiera par défaut de charger les tenseurs sur le GPU sur lequel ils étaient initialement sauvegardés, dans notre cas cuda:0
. Pour éviter les problèmes, passez map_location
à torch.load
pour charger les tenseurs sur le GPU identifié par chaque processus.
torch.distributed.barrier() map_location = f"cuda:{local_rank}" ddp_model.load_state_dict( torch.load("./checkpoint_path", map_location=map_location))
Dépannage
Fuites de mémoire
Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de torch
.
c10::Error
Dans certains cas, vous pouvez obtenir un erreur comme
terminate called after throwing an instance of 'c10::Error' what(): Given groups=1, weight of size [256, 1, 3, 3], expected input[16, 10, 16, 16] to have 1 channels, but got 10 channels instead Exception raised from check_shape_forward at /tmp/coulombc/pytorch_build_2021-11-09_14-57-01/avx2/python3.8/pytorch/aten/src/ATen/native/Convolution.cpp:496 (most recent call first): ...
Une exception C++ est émise plutôt qu'une exception Python. Ceci peut se produire quand vous programmez en C++ avec libtorch, mais ne devrait pas se produire quand vous programmez en Python. Il n'est pas possible de suivre la trace des appels (traceback)du programme Python, ce qui ne permet pas d'identifier facilement la cause de l'erreur dans le script Python. Nous avons constaté que le fait d'utiliser PyTorch 1.9.1 plutôt que 1.10.x permet le traceback du programme Python.
LibTorch
LibTorch permet d'implémenter à PyTorch des extensions C++ et des applications d'apprentissage machine en C++ pur. La distribution LibTorch possède les en-têtes, bibliothèques et fichiers de configuration CMake nécessaires pour travailler avec PyTorch, tel que décrit dans la documentation.
Utiliser LibTorch
Configurer l'environnement
Chargez les modules requis par LibTorch, puis installez PyTorch dans un environnement virtuel Python.
module load StdEnv/2023 gcc cuda/12.2 cmake protobuf cudnn python/3.11 abseil cusparselt opencv/4.8.1 virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate pip install --no-index torch numpy
Vous devrez peut-être ajuster les versions des modules abseil, cusparselt et opencv, dépendant du paquet torch que vous utilisez. Pour savoir quelle version d'un module a été utilisée pour compiler le wheel Python, lancez la commande
$ ldd $VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib/libtorch_cuda.so | sed -n 's&^.*/\(\(opencv\|abseil\|cusparselt\)/[^/]*\).*&\1&p' | sort -u
abseil/20230125.3
cusparselt/0.5.0.1
opencv/4.8.1
module load gcc cuda/11.4 cmake protobuf cudnn python/3.10 virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate pip install --no-index torch numpy
Compiler un exemple simple
Créez les deux fichiers suivants :
#include <torch/torch.h>
#include <iostream>
int main()
{
torch::Device device(torch::kCPU);
if (torch::cuda::is_available())
{
std::cout << "CUDA is available! Using GPU." << std::endl;
device = torch::Device(torch::kCUDA);
}
torch::Tensor tensor = torch::rand({2, 3}).to(device);
std::cout << tensor << std::endl;
}
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(example)
find_package(Torch REQUIRED)
add_executable(example example.cpp)
target_link_libraries(example "${TORCH_LIBRARIES}")
set_property(TARGET example PROPERTY CXX_STANDARD 14)
Activez l'environnement virtuel Python, configurez le projet et compilez le programme.
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.11/site-packages \ -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib,-L$EBROOTCUDA/extras/CUPTI/lib64 \ -DCMAKE_SKIP_RPATH=ON -DTORCH_CUDA_ARCH_LIST="6.0;7.0;7.5;8.0;9.0" cmake --build build
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.10/site-packages \ -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.10/site-packages/torch/lib \ -DCMAKE_SKIP_RPATH=ON cmake --build build
Lancez le programme avec
build/example
Pour tester une application avec CUDA, demandez une tâche interactive avec GPU.