PyTorch/fr: Difference between revisions

From Alliance Doc
Jump to navigation Jump to search
No edit summary
No edit summary
Tags: Mobile edit Mobile web edit
 
(281 intermediate revisions by 4 users not shown)
Line 1: Line 1:
<languages />
<languages />
[[Category:Software]]
[[Category:Software]][[Category:AI and Machine Learning]]
[http://pytorch.org/ PyTorch] est un paquet Python qui offre deux fonctionnalités de haut niveau&nbsp;:  
[http://pytorch.org/ PyTorch] est un paquet Python qui offre deux fonctionnalités de haut niveau&nbsp;:  
*le calcul tensoriel (semblable à celui effectué par NumPy) avec grande accélération de GPU,
*le calcul tensoriel (semblable à celui effectué par NumPy) avec forte accélération de GPU,
*des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone.
*des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone.


Il y a une certaine ressemblance entre PyTorch et [[Torch/fr|Torch]], mais pour des raisons pratiques vous pouvez considérer que ce sont des paquets différents.
Si vous voulez porter un programme PyTorch sur une de nos grappes, il serait bon de prendre connaissance [[Tutoriel Apprentissage machine| de ce tutoriel]].
 
= Clarification =
 
Il y a une certaine ressemblance entre PyTorch et [[Torch/fr|Torch]], mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents.
 
Les développeurs PyTorch offrent aussi [[PyTorch/fr#LibTorch|LibTorch]] qui permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. Les modèles Python écrits avec PyTorch peuvent être convertis et utilisés en C++ avec [https://pytorch.org/tutorials/advanced/cpp_export.html TorchScript].


= Installation =
= Installation =


==''Wheels'' récemment ajoutés==
==Wheels récemment ajoutés==
Pour connaître la dernière version de PyTorch, utilisez
Pour connaître la dernière version de PyTorch, utilisez
{{Command|avail_wheels "torch*"}}
{{Command|avail_wheels "torch*"}}
Voyez aussi [[Python/fr#Lister_les_wheels_disponibles|Lister les ''wheels'' disponibles]].
Pour plus d'information, voyez [[Python/fr#Wheels_disponibles|Wheels disponibles]].


==Installation de la wheel Calcul Canada==
==Installation du wheel ==


La meilleure option est d'installer avec [https://pythonwheels.com/ Python wheels] comme suit :
La meilleure option est d'installer avec [https://pythonwheels.com/ Python wheels] comme suit :


::1. [[Utiliser_des_modules#Sous-commande_load|Chargez un module]]  Python, soit <tt>python/2.7</tt>, <tt>python/3.5</tt>, <tt>python/3.6</tt> ou <tt>python/3.7</tt>.
::1. [[Utiliser_des_modules#Sous-commande_load|Chargez un module]]  Python avec <code>module load python</code>.
::2. Créez et démarrez un  [[Python/fr#Créer_et_utiliser_un_environnement_virtuel|environnement virtuel]].
::2. Créez et démarrez un  [[Python/fr#Créer_et_utiliser_un_environnement_virtuel|environnement virtuel]].
::3. Installez PyTorch dans l'environnement virtuel avec <code>pip install</code>.  
::3. Installez PyTorch dans l'environnement virtuel avec <code>pip install</code>.  


==== GPU et CPU ====
==== GPU et CPU ====
:{{Command|prompt=(venv) [name@server ~]|pip install numpy torch --no-index}}
:{{Command|prompt=(venv) [name@server ~]|pip install --no-index torch }}
 
<b>Remarque : </b>PyTorch 1.10 cause des problèmes connus sur nos grappes (à l'exception de Narval). Si l'entraînement distribué produit des erreurs ou si vous obtenez une erreur qui inclut <code>c10::Error</code>, nous vous recommandons d'installer PyTorch 1.9.1 avec <code>pip install --no-index torch==1.9.1</code>.


====En supplément====
====En supplément====
En plus de  <tt>torch</tt>, vous pouvez aussi installer <tt>torchvision</tt>, <tt>torchtext</tt> et <tt>torchaudio</tt>.
En plus de  <code>torch</code>, vous pouvez aussi installer <code>torchvision</code>, <code>torchtext</code> et <code>torchaudio</code>.
{{Command|prompt=(venv) [name@server ~]|pip install torch torchvision torchtext torchaudio --no-index}}
{{Command|prompt=(venv) [name@server ~]|pip install --no-index torch torchvision torchtext torchaudio }}


====libtorch====
=Soumettre une tâche=
<tt>libtorch.so</tt> est compris dans le paquet (''wheel''). Une fois que PyTorch est installé dans un environnement virtuel, vous pouvez le trouver avec <tt>$VIRTUAL_ENV/lib/python3.6/site-packages/torch/lib/libtorch.so</tt>.


=Soumettre une tâche=
Le script suivant est un exemple de soumission d'une tâche utilisant le wheel Python avec un environnement virtuel.


Le script suivant est un exemple de soumission d'une tâche utilisant le ''wheel'' Python dans un environnement virtuel de $HOME/pytorch.
{{File
{{File
   |name=pytorch-test.sh
   |name=pytorch-test.sh
Line 46: Line 52:
#SBATCH --output=%N-%j.out
#SBATCH --output=%N-%j.out


module load python/3.6
module load python/<select version> # Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
source $SLURM_TMPDIR/env/bin/activate
Line 54: Line 60:
}}
}}


Le script Python <code>pytorch-test.py</code> est semblable à
Le script Python <code>pytorch-ddp-test.py</code> a la forme suivante :
 
{{File
{{File
   |name=pytorch-test.py
   |name=pytorch-test.py
Line 73: Line 80:
Vous pouvez alors soumettre une tâche PyTorch avec
Vous pouvez alors soumettre une tâche PyTorch avec
{{Command|sbatch pytorch-test.sh}}
{{Command|sbatch pytorch-test.sh}}
= Haute performance=
== TF32 : Performance vs précision ==
Avec sa version 1.7.0, PyTorch a ajouté le support pour le [https://blogs.nvidia.com/blog/2020/05/14/tensorfloat-32-precision-format/ mode TensorFloat-32 (TF32) de Nvidia] et est seulement disponible pour les architectures GPU d'Ampere et de Nvidia. Avec ce mode qui est offert par défaut dans les versions 1.7.x à 1.11.x, les opérations tensorielles se font jusqu'à 20x plus rapidement que les opérations équivalentes en simple précision (FP32). Cependant, ce gain en performance peut engendrer une baisse dans la précision du résultat des opérations, ce qui pose problème avec les modèles d'apprentissage profond qui utilisent à l'occasion des matrices mal conditionnées ou qui effectuent de longues séquences d'opérations tensorielles. Suite aux commentaires de la communauté des utilisateurs, TF32 est <b>désactivé par défaut pour les multiplications matricielle et activé par défaut pour les convolutions</b> à partir de la version 1.12.0.
En date d'octobre 2022, notre seule grappe qui offre des GPU Ampere est [[Narval]]. Quand vous utilisez PyTorch sur Narval,
* Vous pourriez remarquer un fort ralentissement dans l'exécution sur GPU du même code avec <code>torch < 1.12.0</code> et <code>torch >= 1.12.0</code>.
* Vous pourriez obtenir des résultats différents dans l'exécution sur GPU du même code avec <code>torch < 1.12.0</code> et <code>torch >= 1.12.0</code>.
Pour activer ou désactiver TF32 pour <code>torch >= 1.12.0</code>, donnez la valeur <code>True</code> ou <code>False</code> aux indicateurs suivants&nbsp;:
torch.backends.cuda.matmul.allow_tf32 = False # Enable/disable TF32 for matrix multiplications
torch.backends.cudnn.allow_tf32 = False # Enable/disable TF32 for convolutions
Pour plus d'information, consultez [https://pytorch.org/docs/stable/notes/cuda.html#tf32-on-ampere ce paragraphe de la documentation PyTorch].
== Travailler avec plusieurs CPU ==
Par défaut, PyTorch permet le parallélisme avec plusieurs CPU de deux façons&nbsp;:
* <b>intra-op</b>, par l’implémentation parallèle d’opérateurs souvent utilisés en apprentissage profond comme le produit matriciel ou le produit de convolution, en utilisant [https://www.openmp.org OpenMP] directement ou avec des bibliothèques de bas niveau comme [https://en.wikipedia.org/wiki/Math_Kernel_Library MKL] et [https://www.intel.com/content/www/us/en/develop/documentation/oneapi-programming-guide/top/api-based-programming/intel-oneapi-deep-neural-network-library-onednn.html OneDNN]. Quand du code PyTorch doit effectuer de telles opérations, elles utilisent automatiquement de multiples fils avec tous les cœurs CPU disponibles.
* <b>inter-op</b>, par la capacité d’exécuter différentes parties de code de manière concurrente. Ce mode de parallélisme nécessite habituellement que le programme soit conçu de manière à exécuter plusieurs parties en parallèle, par exemple en faisant usage du compilateur en temps réel <tt>torch.jit</tt> pour exécuter des tâches asynchrones dans un programme [https://pytorch.org/docs/stable/jit.html#built-in-functions-and-modules TorchScript].
Pour les petits modèles, nous recommandons fortement <b>d’utiliser plusieurs CPU plutôt qu’un GPU</b>. L’entraînement sera certainement plus rapide avec un GPU (sauf dans les cas de très petits modèles), mais si le modèle et le jeu de données ne sont pas assez grands, la vitesse gagnée avec le GPU ne sera probablement pas très importante et la tâche n’utilisera qu’une petite part de la capacité de calcul. Ce n’est peut-être pas grave sur votre propre ordinateur, mais dans un environnement partagé comme sur nos grappes, vous bloqueriez une ressource qui pourrait servir à effectuer de calculs de grande échelle par un autre projet. De plus, l’utilisation d’un GPU contribuerait à la diminution de l’allocation de votre groupe et aurait une incidence sur la priorité accordée aux tâches de vos collègues.
Dans le code suivant, il y a plusieurs occasions d’utiliser le parallélisme intra-op. En demandant plus de CPU et sans changer le code, on peut constater l’effet sur la performance.
{{File
  |name=pytorch-multi-cpu.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance
#SBATCH --mem=8G     
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-cpu.py
}}
{{File
  |name=cifar10-cpu.py
  |lang="python"
  |contents=
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
import os
parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    args = parser.parse_args()
    torch.set_num_threads(int(os.environ['SLURM_CPUS_PER_TASK']))
    class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    net = Net()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=args.lr)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    ### This next line will attempt to download the CIFAR10 dataset from the internet if you don't already have it stored in ./data
    ### Run this line on a login node with "download=True" prior to submitting your job, or manually download the data from
    ### https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and place it under ./data
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    perf = []
    total_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
      outputs = net(inputs)
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      batch_time = time.time() - start
      images_per_sec = args.batch_size/batch_time
      perf.append(images_per_sec)
    total_time = time.time() - total_start
if __name__=='__main__':
  main()
}}
== Travailler avec un seul GPU ==
On entend souvent dire qu’il faut absolument entraîner un modèle avec un GPU s’il y en a un à notre disposition. Ceci est ''presque toujours'' vrai (l'entraînement de très petits modèles est souvent plus rapide avec un ou plusieurs CPU) sur un poste de travail local, mais ce n’est pas le cas sur nos grappes.
Autrement dit, vous '''ne devriez pas demander un GPU''' si votre code ne peut pas faire un usage raisonnable de sa capacité de calcul.
La performance avantageuse des GPU pour les tâches d’apprentissage profond provient de deux sources&nbsp;:
# La capacité de paralléliser l’exécution de certaines opérations clés, par exemple le [https://fr.wikipedia.org/wiki/Multiplieur-accumulateur multiplieur-accumulateur], sur plusieurs milliers de cœurs de calcul, en comparaison du très petit nombre de cœurs disponibles avec la plupart des CPU.
# Une bande passante de mémoire beaucoup plus grande que pour un CPU, ce qui permet aux GPU d’utiliser efficacement leur très grand nombre de cœurs pour traiter une plus grande quantité de données par cycle de calcul.
Comme c’est le cas avec plusieurs CPU, PyTorch offre des implémentations parallèles  d’opérateurs souvent utilisés en apprentissage profond, comme le produit matriciel et le produit de convolution et utilise des bibliothèques spécialisées pour les GPU comme [https://developer.nvidia.com/cudnn CUDNN] ou [https://github.com/ROCmSoftwarePlatform/MIOpen MIOpen], selon la plateforme matérielle. Ceci signifie que pour qu’il vaille la peine d’utiliser un GPU pour une tâche d’apprentissage, elle doit être composée d’éléments qui peuvent être élargis à une application massive du parallélisme de par le nombre d’opérations pouvant être parallélisées, de par la quantité des données à traiter ou idéalement de par les deux. Un exemple concret serait un grand modèle qui a un grand nombre d’unités et de couches ou qui a beaucoup de données en entrée, et idéalement qui présente ces deux caractéristiques.
Dans l’exemple ci-dessous, nous adaptons le code de la section précédente pour utiliser un GPU et nous examinons la performance. Nous observons que deux paramètres jouent un rôle important : <code>batch_size</code> et <code>num_workers</code>. Le premier paramètre améliore la performance en augmentant la taille des entrées à chaque itération et en utilisant mieux la capacité du GPU. Dans le cas du second paramètre, la performance est améliorée en facilitant  le mouvement des données entre la mémoire de l’hôte (le CPU) et la mémoire du GPU, ce qui réduit la durée d’inactivité du GPU en attente de données à traiter.
Nous pouvons tirer deux conclusions&nbsp;:
# Augmenter la valeur de <code>batch_size</code> au maximum qu’il est possible pour la mémoire du GPU optimise la performance.
# Utiliser un <code>DataLoader</code> avec autant de workers que <code>cpus-per-task</code> facilite l’apport de données au GPU.
Bien entendu, le paramètre <code>batch_size</code> a aussi un impact sur la performance d’un modèle dans une tâche(c.à.d. l’exactitude, l’erreur, etc.) et il existe différentes écoles de pensée sur l’utilisation de grands lots. Nous n’abordons pas le sujet ici, mais si vous croyez qu’un petit lot conviendrait mieux à votre application, allez à la section [[PyTorch/fr#Travailler_avec_un_seul_GPU|Travailler avec un seul GPU]]  pour savoir comment maximiser l’utilisation du GPU avec de petites entrées de données.
{{File
  |name=pytorch-single-gpu.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G     
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
echo "starting training..."
time python cifar10-gpu.py --batch_size=512 --num_workers=0
}}
{{File
  |name=cifar10-gpu.py
  |lang="python"
  |contents=
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    args = parser.parse_args()
    class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    net = Net().cuda() # Load model on the GPU
    criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU
    optimizer = optim.SGD(net.parameters(), lr=args.lr)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    perf = []
    total_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
     
      inputs = inputs.cuda()
      targets = targets.cuda()
      outputs = net(inputs)
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      batch_time = time.time() - start
      images_per_sec = args.batch_size/batch_time
      perf.append(images_per_sec)
    total_time = time.time() - total_start
if __name__=='__main__':
  main()
}}
=== Parallélisme des données avec un seul GPU ===
Il <b>n’est pas conseillé d’utiliser un GPU</b> avec un modèle de taille relativement petite qui n’utilise pas une grande part de la mémoire du GPU et une part raisonnable de sa capacité de calcul; utilisez plutôt [[PyTorch/fr#Travailler_avec_plusieurs_CPU|un ou plusieurs CPU]]. Par contre, profiter du parallélisme du GPU devient une bonne option si vous avez un tel modèle avec un très grand jeu de données et que vous voulez effectuer l’entraînement avec des lots de petite taille.
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit un morceau des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, <b>un avertissement s’impose</b>&nbsp;:  pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez [https://discuss.pytorch.org/t/should-we-split-batch-size-according-to-ngpu-per-node-when-distributeddataparallel/72769/13 ces échanges]. 
PyTorch offre des implémentations de méthodes de parallélisme des données, la classe <code>DistributedDataParallel</code> étant celle [https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#comparison-between-dataparallel-and-distributeddataparallel recommandée par les développeurs de PyTorch] pour donner la meilleure performance. Conçue pour le [[PyTorch/fr#Travailler_avec_plusieurs_GPU|travail avec plusieurs GPU]], elle peut aussi être employée avec un seul GPU.
Dans l’exemple ci-dessous, nous adaptons le code pour un seul GPU pour utiliser le parallélisme des données. La tâche est relativement petite; la taille du lot est de 512 images, le modèle occupe environ 1Go de la mémoire du GPU et l’entraînement n’utilise qu’environ 6 % de sa capacité de calcul. Ce modèle '''ne devrait pas''' être entraîné sur un GPU sur nos grappes. Cependant, en parallélisant les données, un GPU V100 avec 16Go de mémoire peut contenir 14 ou 15 copies du modèle et augmenter l'utilisation de la ressource en plus d’obtenir une bonne augmentation de vitesse. Nous utilisons [https://docs.nvidia.com/deploy/mps/index.html Multi-Process Service (MPS) de NVIDIA] avec [[MPI/fr|MPI]] pour placer plusieurs copies du modèle sur un GPU de façon efficace.
{{File
  |name=pytorch-gpu-mps.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance 
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G     
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# Activate Nvidia MPS:
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log
nvidia-cuda-mps-control -d
echo "starting training..."
time srun --cpus-per-task=$SLURM_CPUS_PER_TASK python cifar10-gpu-mps.py --batch_size=512 --num_workers=0
}}
{{File
  |name=cifar10-gpu-mps.py
  |lang="python"
  |contents=
import os
import time
import datetime
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    rank = os.environ.get("SLURM_LOCALID")
    current_device = 0
    torch.cuda.set_device(current_device)
    """ this block initializes a process group and initiate communications
                between all processes that will run a model replica """
    print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
    dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check.
    print("process group ready!")
    print('From Rank: {}, ==> Making model..'.format(rank))
    class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    net = Net()
    net.cuda()
    net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr)
    print('From Rank: {}, ==> Preparing data..'.format(rank))
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
    perf = []
    total_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
     
      inputs = inputs.cuda()
      targets = targets.cuda()
      outputs = net(inputs)
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      batch_time = time.time() - start
      images_per_sec = args.batch_size/batch_time
      perf.append(images_per_sec)
    total_time = time.time() - total_start
if __name__=='__main__':
  main()
}}
== Travailler avec plusieurs GPU ==
=== Problème avec DistributedDataParallel et PyTorch 1.10 ===
Avec notre wheel PyTorch 1.10 <code>torch-1.10.0+computecanada</code>, le code pour travailler avec plusieurs GPU et qui utilise
[[PyTorch/fr#DistributedDataParallel|DistributedDataParallel]] pourrait échouer de façon imprévisible si le backend est défini comme étant <code>'nccl'</code> ou <code>'gloo'</code>. Nous vous recommandons d'utiliser la version la plus récente de PyTorch plutôt que la version 1.10 sur toutes les grappes d'usage général.
===Paralléliser les données avec plusieurs GPU===
Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit une portion des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose :  pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez [https://discuss.pytorch.org/t/should-we-split-batch-size-according-to-ngpu-per-node-when-distributeddataparallel/72769/13 ces échanges].
Quand plusieurs GPU sont utilisés, chacun reçoit une copie du modèle; il doit donc être assez petit pour être contenu dans la mémoire d’un GPU. Pour entraîner un modèle qui dépasse la quantité de mémoire d’un GPU, voyez la section [[PyTorch/fr#Paralléliser_un_modèle_avec_plusieurs_GPU|Paralléliser un modèle avec plusieurs GPU]].
Il existe plusieurs manières de paralléliser les données avec PyTorch. Nous présentons ici des tutoriels avec la classe '''DistributedDataParallel''', avec le paquet  '''PyTorch Lightning''' et avec le paquet '''Horovod'''.
====DistributedDataParallel====
Avec plusieurs GPU, la classe '''DistributedDataParallel''' est  [https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#comparison-between-dataparallel-and-distributeddataparallel recommandée par les développeurs PyTorch], que ce soit avec un nœud unique ou avec plusieurs nœuds. Dans le cas qui suit, plusieurs GPU sont distribués sur deux nœuds.
{{File
  |name=pytorch-ddp-test.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2  # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G     
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
srun -N $SLURM_NNODES -n $SLURM_NNODES bash << EOF
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision --no-index
EOF
export TORCH_NCCL_ASYNC_HANDLING=1
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.
echo "r$SLURM_NODEID master: $MASTER_ADDR"
echo "r$SLURM_NODEID Launching python script"
# The $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times
source $SLURM_TMPDIR/env/bin/activate
srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES))  --batch_size 256
}}
Le script Python <code>pytorch-ddp-test.py</code> a la forme suivante :
{{File
  |name=pytorch-ddp-test.py
  |lang="python"
  |contents=
import os
import time
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='gloo', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    ngpus_per_node = torch.cuda.device_count()
    """ This next line is the key to getting DistributedDataParallel working on SLURM:
SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the
current process inside a node and is also 0 or 1 in this example."""
    local_rank = int(os.environ.get("SLURM_LOCALID"))
    rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank
    current_device = local_rank
    torch.cuda.set_device(current_device)
    """ this block initializes a process group and initiate communications
between all processes running on all nodes """
    print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
    #init the process group
    dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank)
    print("process group ready!")
    print('From Rank: {}, ==> Making model..'.format(rank))
    class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    net = Net()
    net.cuda()
    net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device])
    print('From Rank: {}, ==> Preparing data..'.format(rank))
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
    for epoch in range(args.max_epochs):
        train_sampler.set_epoch(epoch)
        train(epoch, net, criterion, optimizer, train_loader, rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank):
    train_loss = 0
    correct = 0
    total = 0
    epoch_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
      inputs = inputs.cuda()
      targets = targets.cuda()
      outputs = net(inputs)
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      train_loss += loss.item()
      _, predicted = outputs.max(1)
      total += targets.size(0)
      correct += predicted.eq(targets).sum().item()
      acc = 100 * correct / total
      batch_time = time.time() - start
      elapse_time = time.time() - epoch_start
      elapse_time = datetime.timedelta(seconds=elapse_time)
      print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
  main()
}}
====PyTorch Lightning====
Ce paquet fournit des interfaces à PyTorch afin de simplifier plusieurs tâches communes exigeant beaucoup de code; ceci inclut les tâches d'entraînement de modèles avec plusieurs GPU. Dans le tutoriel suivant pour PyTorch Lightning, nous reprenons le même exemple que ci-dessus, mais sans avoir explicitement recours à la classe DistributedDataParallel.
{{File
  |name=pytorch-ddp-test-pl.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G     
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python pytorch-ddp-test-pl.py  --batch_size 256
}}
{{File
  |name=pytorch-ddp-test-pl.py
  |lang="python"
  |contents=
import datetime
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    class Net(pl.LightningModule):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
      def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss
      def configure_optimizers(self):
          return torch.optim.Adam(self.parameters(), lr=args.lr)
    net = Net()
    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPUs per node.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
        which can cause issues due to updating logs too frequently."""
    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy='ddp', max_epochs = args.max_epochs, enable_progress_bar=False)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    trainer.fit(net,train_loader)
if __name__=='__main__':
  main()
}}
====Horovod====
[https://horovod.readthedocs.io/en/latest/summary_include.html Horovod] est une plateforme d'entraînement distribué pour l'apprentissage profond, compatible avec TensorFlow, Keras, PyTorch et Apache MXNet. Son API vous permet d'avoir le même niveau de contrôle sur votre code d'entraînement qu'avec <code>DistributedDataParallel</code>, mais simplifie l'écriture de vos scripts en éliminant le besoin de configurer directement les groupes de processus et en prenant en charge les variables d'environnement de l'ordonnanceur. Horovod offre aussi des optimiseurs distribués qui dans certains cas améliorent la performance. L'exemple suivant est le même que le précédent, cette fois avec Horovod.
{{File
  |name=pytorch_horovod.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2        # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2  # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G     
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision horovod --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
srun python pytorch_horovod.py  --batch_size 256
}}
{{File
  |name=pytorch_horovod.py
  |lang="python"
  |contents=
import os
import time
import datetime
import numpy as np
import horovod.torch as hvd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--max_epochs', type=int, default=1, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    args = parser.parse_args()
    hvd.init()
    print("Starting...")
    local_rank = hvd.local_rank()
    global_rank = hvd.rank()
    torch.cuda.set_device(local_rank)
    class Net(nn.Module):
      def __init__(self):
          super(Net, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
      def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    net = Net()
    net.cuda()
    print('From Rank: {}, ==> Preparing data..'.format(global_rank))
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())
    hvd.broadcast_parameters(net.state_dict(), root_rank=0)
    for epoch in range(args.max_epochs):
        train_sampler.set_epoch(epoch)
        train(args,epoch, net, criterion, optimizer, train_loader, global_rank)
def train(args,epoch, net, criterion, optimizer, train_loader, train_rank):
    train_loss = 0
    correct = 0
    total = 0
    epoch_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
      inputs = inputs.cuda()
      targets = targets.cuda()
      outputs = net(inputs)
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      train_loss += loss.item()
      _, predicted = outputs.max(1)
      total += targets.size(0)
      correct += predicted.eq(targets).sum().item()
      acc = 100 * correct / total
      batch_time = time.time() - start
      elapse_time = time.time() - epoch_start
      elapse_time = datetime.timedelta(seconds=elapse_time)
      print("From Rank: {}, Training time {}".format(train_rank, elapse_time))
if __name__=='__main__':
  main()
}}
===Paralléliser un modèle avec plusieurs GPU===
Quand un modèle est trop grand pour être contenu dans [[PyTorch/fr#Travailler_avec_un_seul_GPU|un seul GPU]], vous pouvez le diviser en portions et charger chacune sur des GPU distincts. Dans l’exemple suivant, nous reprenons le code des sections précédentes pour illustrer le fonctionnement. Nous divisons un réseau de neurones convolutifs en deux : d’une part les couches convolutionnelles/de regroupement et d'autre part les couches acycliques entièrement connectées. La tâche demande deux GPU, chacun d’eux étant chargé avec une des parts.  De plus, nous ajoutons du code pour [https://pytorch.org/docs/stable/pipeline.html?highlight=pipeline paralléliser les pipelines] et ainsi réduire au maximum le temps pendant lequel le deuxième GPU est inactif dans l’attente des résultats du premier. Pour ce faire, nous créons un <code>nn.Module</code> distinct pour chacune des portions du modèle, puis créons une séquence de modules en enveloppant les portions avec <code>nn.Sequential</code>, et ensuite utilisons <code>torch.distributed.pipeline.sync.Pipe</code> pour morceler chacun des lots en entrée et les passer en parallèle aux deux portions du modèle.
{{File
  |name=pytorch-modelpar-pipelined-rpc.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # request 2 GPUs
#SBATCH --tasks-per-node=1
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G     
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism
export MASTER_ADDR=$(hostname)
export MASTER_PORT=34567
echo "starting training..."
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0
}}
{{File
  |name=pytorch-modelpar-pipelined-rpc.py
  |lang="python"
  |contents=
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, single node model parallelism test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    args = parser.parse_args()
    # Convolutional + pooling part of the model
    class ConvPart(nn.Module):
      def __init__(self):
          super(ConvPart, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          return x
    # Dense feedforward part of the model
    class MLPPart(nn.Module):
      def __init__(self):
          super(MLPPart, self).__init__()
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below
    part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU
    part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU
    net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential()
    net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism
    criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU
    optimizer = optim.SGD(net.parameters(), lr=args.lr)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    perf = []
    total_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
      start = time.time()
      inputs = inputs.to('cuda:0')
      targets = targets.to('cuda:1')
      # Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them
      outputs = net(inputs).local_value()
      loss = criterion(outputs, targets)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      print(f"Loss: {loss.item()}")
      batch_time = time.time() - start
      images_per_sec = args.batch_size/batch_time
      perf.append(images_per_sec)
    total_time = time.time() - total_start
if __name__=='__main__':
  main()
                                                   
}}
===Paralléliser modèle et données avec plusieurs GPU===
Quand un modèle est trop grand pour être contenu dans un seul GPU et que son entraînement doit se faire avec un très grand ensemble de données, le fait de combiner le parallélisme du modèle et celui des données permet d’obtenir une bonne performance. Le principe est simple&nbsp;: le modèle est divisé en portions chacune attribuée à un GPU; le parallélisme des pipelines est fait avec les résultats; puis des copies du processus sont faites et les copies du modèle sont entraînées en parallèle avec des sous-ensembles distincts de l’ensemble de données d’entraînement.
Comme décrit [[PyTorch/fr#Paralléliser_les_données_avec plusieurs_GPU|ci-dessus]], les gradients sont calculés indépendamment dans chacune des copies et agrégés pour modifier toutes les copies de façon synchrone ou asynchrone, dépendant de la méthode. La différence principale ici est que chaque copie du modèle se trouve sur plus d’un GPU.
==== Utiliser Torch RPC et DDP ====
Toujours avec le même exemple, nous combinons maintenant Torch RPC et DistributedDataParallel pour séparer le modèle en deux portions et entraîner quatre copies du modèle distribuées en parallèle sur deux nœuds. Autrement dit, nous avons deux copies sur deux GPU de chaque nœud. Cependant, un avertissement s’impose&nbsp;: à ce jour, Torch RPC prend en charge la division d’un modèle dans un seul nœud. Pour entraîner un modèle qui dépasse la quantité de mémoire de tous les GPU dans un nœud de calcul, voyez [[PyTorch/fr#DeepSpeed|la section DeepSpeed]].
{{File
  |name=pytorch-model-data-par.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 2
#SBATCH --gres=gpu:4 # Request 4 GPUs per node
#SBATCH --tasks-per-node=2 # Request one task per MODEL per node
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=16G     
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load StdEnv/2020 gcc/11.3.0
module load python # Using Default Python version - Make sure to choose a version that suits your application, python/3.10.2 works with this demo
module load cuda/11.8.0
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index
export MAIN_NODE=$(hostname)
echo "starting training..."
srun python pytorch-model-data-par.py --init_method tcp://$MAIN_NODE:3456 --world_size $SLURM_NTASKS  --batch_size 512
}}
{{File
  |name=pytorch-model-data-par.py
  |lang="python"
  |contents=
import time
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data & model parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='mpi', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')
def main():
    args = parser.parse_args()
    # Convolutional + pooling part of the model
    class ConvPart(nn.Module):
      def __init__(self):
          super(ConvPart, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          return x
    # Dense feedforward part of the model
    class MLPPart(nn.Module):
      def __init__(self):
          super(MLPPart, self).__init__()
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    ngpus_per_node = torch.cuda.device_count()
    local_rank = int(os.environ.get("SLURM_LOCALID"))
    rank = int(os.environ.get("SLURM_NODEID"))*(ngpus_per_node//2) + local_rank  # Divide ngpus_per_node by the number of model parts
    os.environ['MASTER_ADDR'] = '127.0.0.1' # Each model replica will run its own RPC server to run pipeline parallelism
    os.environ['MASTER_PORT'] = str(34567 + local_rank) # Make sure each RPC server starts on a different port
    torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # Different replicas won't communicate through RPC, but through DDP
    dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) # Initialize Data Parallelism communications
    part1 = ConvPart().cuda(local_rank) # First part of the model goes on the first GPU of each process
    part2 = MLPPart().cuda(local_rank + 1) # Second part goes on the second GPU of each process
    net = nn.Sequential(part1,part2)
    net = Pipe(net, chunks=32, checkpoint="never")
    net = torch.nn.parallel.DistributedDataParallel(net)
    criterion = nn.CrossEntropyLoss().cuda(local_rank + 1) # Loss function goes on the second GPU of each process
    optimizer = optim.SGD(net.parameters(), lr=args.lr)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)
    for epoch in range(args.max_epochs):
        train_sampler.set_epoch(epoch)
        train(epoch, net, criterion, optimizer, train_loader, rank, local_rank)
def train(epoch, net, criterion, optimizer, train_loader, train_rank, model_rank):
    train_loss = 0
    correct = 0
    total = 0
    epoch_start = time.time()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        start = time.time()
        inputs = inputs.cuda(model_rank)
        targets = targets.cuda(model_rank + 1)
        outputs = net(inputs).local_value()
        loss = criterion(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"From Rank {train_rank} - Loss: {loss.item()}")
        batch_time = time.time() - start
if __name__=='__main__':
  main()
}}
===DeepSpeed===
DeepSpeed est une bibliothèque qui permet d'optimiser l’entraînement de l’apprentissage de modèles ayant des milliards de paramètres à l’échelle. Parfaitement compatible avec PyTorch, DeepSpeed offre des implémentations de nouvelles méthodes d’entraînement distribué qui font un usage efficace de la mémoire en appliquant le concept de [https://arxiv.org/abs/1910.02054 Zero Redundancy Optimizer (ZeRO)]. Avec ZeRO, DeepSpeed peut distribuer le stockage et le traitement  des divers éléments d’une tâche d’entraînement (états de l’optimiseur, poids du modèle, gradients et activations) sur plusieurs dispositifs comme les GPU, les CPU, les disques durs locaux et/ou les combinaisons de ces dispositifs. Cette mise en commun des ressources, surtout les ressources de stockage, permet l’entraînement efficace sur plusieurs nœuds de modèles ayant d’énormes quantités de paramètres sans avoir besoin d’écrire le code pour paralléliser le modèle, les pipelines ou les données. Les exemples qui suivent montrent comment profiter de DeepSpeed et des différentes implémentations de ZeRO en utilisant l’interface simple de  PyTorch Lightning.
==== ZeRO avec GPU ====
Dans l’exemple ci-dessous, nous utilisons ZeRO stage 3 pour entraîner un modèle qui utilise un groupe de 4 GPU. Le stage 3  répartit sur les 4 GPU les trois caractéristiques, soit les états de l’optimiseur, les paramètres du modèle et les gradients du modèle. Ceci est plus efficace que le parallélisme pur des données où une copie complète du modèle est chargée sur chacun des GPU. Avec l’optimiseur DeepSpeed <code>FusedAdam</code> plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module <code>cuda/<version></code> où ''version'' correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
{{File
  |name=deepspeed-stage3.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G     
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using a DeepSpeed optimizer
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3.py  --batch_size 256
}}
{{File
  |name=deepspeed-stage3.py
  |lang="python"
  |contents=
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import FusedAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models deep seed stage 3 test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    class ConvPart(nn.Module):
      def __init__(self):
          super(ConvPart, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          return x
    # Dense feedforward part of the model
    class MLPPart(nn.Module):
      def __init__(self):
          super(MLPPart, self).__init__()
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    class Net(pl.LightningModule):
      def __init__(self):
          super(Net, self).__init__()
          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()
      def configure_sharded_model(self):
          self.block = nn.Sequential(self.conv_part, self.mlp_part)
      def forward(self, x):
          x = self.block(x)
          return x
      def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss
      def configure_optimizers(self):
          return FusedAdam(self.parameters())
    net = Net()
    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
        which can cause issues due to updating logs too frequently."""
    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy="deepspeed_stage_3", max_epochs = args.max_epochs)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    trainer.fit(net,train_loader)
if __name__=='__main__':
  main()
}}
==== ZeRO avec CPU ====
Dans cet autre exemple, nous utilisons aussi ZeRO stage 3, mais cette fois-ci nous utilisons le CPU pour les états de l’optimiseur et les paramètres du modèle.  Ceci signifie que la mémoire du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU et de plus, les pas de l’optimiseur seront calculés sur le CPU. Pour des raisons pratiques, ce serait comme ajouter 32Go de mémoire additionnelle à la mémoire du GPU. Comme la mémoire du GPU est moins sollicitée, vous pouvez par exemple augmenter la taille de vos lots ou de votre modèle. Avec l’optimiseur DeepSpeed <code>DeepSpeedCPUAdam</code> plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module <code>cuda/<version></code> où ''version'' correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.
{{File
  |name=deepspeed-stage3-offload-cpu.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G     
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-cpu.py  --batch_size 256
}}
{{File
  |name=deepspeed-stage3-offload-cpu.py
  |lang="python"
  |contents=
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    class ConvPart(nn.Module):
      def __init__(self):
          super(ConvPart, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          return x
    # Dense feedforward part of the model
    class MLPPart(nn.Module):
      def __init__(self):
          super(MLPPart, self).__init__()
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    class Net(pl.LightningModule):
      def __init__(self):
          super(Net, self).__init__()
          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()
      def configure_sharded_model(self):
          self.block = nn.Sequential(self.conv_part, self.mlp_part)
      def forward(self, x):
          x = self.block(x)
          return x
      def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss
      def configure_optimizers(self):
          return DeepSpeedCPUAdam(self.parameters())
    net = Net()
    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
        which can cause issues due to updating logs too frequently."""
    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
        stage=3,
        offload_optimizer=True,
        offload_parameters=True,
        ), max_epochs = args.max_epochs)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    trainer.fit(net,train_loader)
if __name__=='__main__':
  main()
}}
====ZeRO avec utilisation de disques NVMe====
ZeRO stage 3 nous sert encore, cette fois-ci en utilisant le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que l’espace de stockage local du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU. Ici encore,  les pas de l’optimiseur seront calculés sur le CPU. De même, pour des raisons pratiques, ce serait comme ajouter à la mémoire du GPU autant d’espace de stockage que sur le disque local, mais par contre nous aurons une forte perte de performance. Cette approche peut être utilisée avec tous les types de stockage, mais elle est à privilégier avec les disques NVMe qui sont plus rapides et ont des temps de réponse plus courts, ce qui compense pour la baisse de performance.
{{File
  |name=deepspeed-stage3-offload-nvme.sh
  |lang="bash"
  |contents=
#!/bin/bash
#SBATCH --nodes 1           
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G     
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch.
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index
export TORCH_NCCL_ASYNC_HANDLING=1
# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!
srun python deepspeed-stage3-offload-nvme.py  --batch_size 256
}}
{{File
  |name=deepspeed-stage3-offload-nvme.py
  |lang="python"
  |contents=
import os
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy
import argparse
parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to nvme test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
def main():
    print("Starting...")
    args = parser.parse_args()
    class ConvPart(nn.Module):
      def __init__(self):
          super(ConvPart, self).__init__()
          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          return x
    # Dense feedforward part of the model
    class MLPPart(nn.Module):
      def __init__(self):
          super(MLPPart, self).__init__()
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()
      def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)
          return x
    class Net(pl.LightningModule):
      def __init__(self):
          super(Net, self).__init__()
          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()
      def configure_sharded_model(self):
          self.block = nn.Sequential(self.conv_part, self.mlp_part)
      def forward(self, x):
          x = self.block(x)
          return x
      def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss
      def configure_optimizers(self):
          return DeepSpeedCPUAdam(self.parameters())
    net = Net()
    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes.
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs,
        which can cause issues due to updating logs too frequently."""
    local_scratch = os.environ['SLURM_TMPDIR'] # Get path where local storage is mounted
    print(f'Offloading to: {local_scratch}')
    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
        stage=3,
        offload_optimizer=True,
        offload_parameters=True,
        remote_device="nvme",
        offload_params_device="nvme",
        offload_optimizer_device="nvme",
        nvme_path="local_scratch",
        ), max_epochs = args.max_epochs)
    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)
    trainer.fit(net,train_loader)
if __name__=='__main__':
  main()
}}
=Créer des points de contrôle=
Peu importe si vous pensez que la durée d'exécution de votre code sera longue ou non, il est bon de prendre l'habitude de créer des points de contrôle pendant l'entraînement. Un point de contrôle est un portrait de votre modèle à un moment précis du processus d'entraînement (après un certain nombre d'itérations ou un certain nombre d'époques) que vous pouvez sauvegarder sur disque et utiliser plus tard. C'est un moyen pratique de diviser les tâches qui devraient être de longue durée en de multiples petites tâches auxquelles l'ordonnanceur peut allouer des ressources plus rapidement. C'est aussi une bonne façon de ne pas perdre le progrès réalisé au cas où des erreurs de code inattendues surviendraient ou que les nœuds ne soient pas disponibles pour quelconque raison.
==Avec PyTorch Lightning==
Nous recommandons d'utiliser le paramètre de rappels (''callbacks parameter'') de la classe <code>Trainer()</code>. Dans l'exemple suivant, on demande à PyTorch de créer un point de contrôle à la fin de chacune des époques d'entraînement. Vérifiez que le chemin où créer le point de contrôle existe.
callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)]
trainer = pl.Trainer(callbacks=callbacks)
trainer.fit(model)
Ce bout de code chargera un point de contrôle de <code>./ckpt</code> (s'il en existe) et poursuivra l'entraînement à partir de ce point. Pour plus d'information, consultez la [https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.callbacks.model_checkpoint.html documentation PyTorch Lightning].
==Avec des boucles d'entraînement personnalisées==
Pour des exemples, consultez [https://pytorch.org/tutorials/recipes/recipes/saving_and_loading_a_general_checkpoint.html la documentation PyTorch].
== Pendant l’entraînement distribué ==
Les points de contrôle peuvent être utilisés pendant l’exécution d’un programme d’entraînement distribué. Avec PyTorch Lightning, aucun code supplémentaire n’est requis, autre que d’insérer le paramètre de rappels (''callbacks parameter'') mentionné ci-dessus. Cependant, si vous utilisez  DistributedDataParallel ou Horovod, les points de contrôle devront être créés par un seul processus (''rank'') de votre programme puisque tous les processus auront le même état après chaque itération. Dans cet exemple, le premier processus (''rank 0'') crée un point de contrôle.
if global_rank == 0:
        torch.save(ddp_model.state_dict(), "./checkpoint_path")
Faites attention aux points de contrôle ainsi créés. Si un processus tente de charger un point de contrôle qui n’a pas encore été sauvegardé par un autre, des erreurs peuvent survenir ou de mauvais résultats peuvent être produits. Pour éviter ceci, vous pouvez ajouter une barrière à votre code pour faire en sorte que le processus qui crée le point de contrôle a terminé son écriture sur le disque avant que d’autres processus tentent de le charger. Remarquez aussi que <code>torch.load</code> essaiera par défaut de charger les tenseurs sur le GPU sur lequel ils étaient initialement sauvegardés, dans notre cas <code>cuda:0</code>. Pour éviter les problèmes, passez <code>map_location</code> à <code>torch.load</code> pour charger les tenseurs sur le GPU identifié par chaque processus.
torch.distributed.barrier()
map_location = f"cuda:{local_rank}" 
ddp_model.load_state_dict(
torch.load("./checkpoint_path", map_location=map_location))


<!-- this section is hidden until results are verified.
<!-- this section is hidden until results are verified.
Line 101: Line 1,835:


== Fuites de mémoire ==
== Fuites de mémoire ==
Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch  antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de <tt>torch</tt>.
Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch  antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de <code>torch</code>.
 
== c10::Error ==
 
Dans certains cas, vous pouvez obtenir un erreur comme
 
  terminate called after throwing an instance of 'c10::Error'
    what():  Given groups=1, weight of size [256, 1, 3, 3], expected input[16, 10, 16, 16] to have 1 channels, but got 10 channels instead
  Exception raised from check_shape_forward at /tmp/coulombc/pytorch_build_2021-11-09_14-57-01/avx2/python3.8/pytorch/aten/src/ATen/native/Convolution.cpp:496 (most recent call first):
  ...
 
Une exception C++ est émise plutôt qu'une exception Python. Ceci peut se produire quand vous programmez en C++ avec libtorch, mais ne devrait pas se produire quand vous programmez en Python. Il n'est pas possible de suivre la trace des appels (''traceback'')du programme Python, ce qui ne permet pas d'identifier facilement la cause de l'erreur dans le script Python. Nous avons constaté que le fait d'utiliser PyTorch 1.9.1 plutôt que 1.10.x permet le ''traceback'' du programme Python.
 
= LibTorch =
 
LibTorch permet d'implémenter à PyTorch des extensions C++ et des <b>applications d'apprentissage machine en C++ pur</b>.  La distribution LibTorch possède les en-têtes, bibliothèques et fichiers de configuration CMake nécessaires pour travailler avec PyTorch, tel que décrit dans la [https://pytorch.org/cppdocs/installing.html documentation].
 
=== Utiliser LibTorch ===
 
==== Configurer l'environnement ====
 
Chargez les modules requis par LibTorch, puis installez PyTorch dans un environnement virtuel Python.
 
<tabs>
<tab name="StdEnv/2023">
module load StdEnv/2023 gcc cuda/12.2 cmake protobuf cudnn python/3.11 abseil  cusparselt  opencv/4.8.1
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate
pip install --no-index torch numpy
 
Vous devrez peut-être ajuster les versions des modules abseil, cusparselt et opencv, dépendant du paquet torch que vous utilisez. Pour savoir quelle version d'un module a été utilisée pour compiler le wheel Python, lancez la commande
 
{{Command
|prompt=$
|ldd $VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib/libtorch_cuda.so {{!}} sed -n 's&^.*/\(\(opencv\{{!}}abseil\{{!}}cusparselt\)/[^/]*\).*&\1&p' {{!}} sort -u
|result=
abseil/20230125.3
cusparselt/0.5.0.1
opencv/4.8.1
}}
</tab>
<tab name="StdEnv/2020">
module load gcc cuda/11.4 cmake protobuf cudnn python/3.10
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate
pip install --no-index torch numpy
</tab>
</tabs>
 
==== Compiler un exemple simple ====
 
Créez les deux fichiers suivants :
 
{{File
  |name=example.cpp
  |lang="cpp"
  |contents=
#include <torch/torch.h>
#include <iostream>
 
int main()
{
    torch::Device device(torch::kCPU);
    if (torch::cuda::is_available())
    {
        std::cout << "CUDA is available! Using GPU." << std::endl;
        device = torch::Device(torch::kCUDA);
    }
 
    torch::Tensor tensor = torch::rand({2, 3}).to(device);
    std::cout << tensor << std::endl;
}
}}
 
{{File
  |name=CMakeLists.txt
  |lang="txt"
  |contents=
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(example)
 
find_package(Torch REQUIRED)
 
add_executable(example example.cpp)
target_link_libraries(example "${TORCH_LIBRARIES}")
set_property(TARGET example PROPERTY CXX_STANDARD 14)
}}
 
Activez l'environnement virtuel Python, configurez le projet et compilez le programme.
<tabs>
<tab name="StdEnv/2023">
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.11/site-packages \
                    -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib,-L$EBROOTCUDA/extras/CUPTI/lib64 \
                    -DCMAKE_SKIP_RPATH=ON -DTORCH_CUDA_ARCH_LIST="6.0;7.0;7.5;8.0;9.0"
cmake --build build
</tab>
<tab name="StdEnv/2020">
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.10/site-packages \
                    -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.10/site-packages/torch/lib \
                    -DCMAKE_SKIP_RPATH=ON
cmake --build build
</tab>
</tabs>
 
Lancez le programme avec
build/example
 
Pour tester une application avec CUDA, demandez une [[Running_jobs/fr#Tâches_interactives|tâche interactive]] avec [[Using_GPUs_with_Slurm/fr|GPU]].
 
= Ressources =
 
https://pytorch.org/cppdocs/

Latest revision as of 17:15, 2 October 2024

Other languages:

PyTorch est un paquet Python qui offre deux fonctionnalités de haut niveau :

  • le calcul tensoriel (semblable à celui effectué par NumPy) avec forte accélération de GPU,
  • des réseaux de neurones d’apprentissage profond dans un système de gradients conçu sur le modèle d’un magnétophone.

Si vous voulez porter un programme PyTorch sur une de nos grappes, il serait bon de prendre connaissance de ce tutoriel.

Clarification

Il y a une certaine ressemblance entre PyTorch et Torch, mais pour des raisons pratiques vous pouvez considérer que ce sont des projets différents.

Les développeurs PyTorch offrent aussi LibTorch qui permet d'implémenter des extensions à PyTorch à l'aide de C++ et d'implémenter des applications d'apprentissage machine en C++ pur. Les modèles Python écrits avec PyTorch peuvent être convertis et utilisés en C++ avec TorchScript.

Installation

Wheels récemment ajoutés

Pour connaître la dernière version de PyTorch, utilisez

Question.png
[name@server ~]$ avail_wheels "torch*"

Pour plus d'information, voyez Wheels disponibles.

Installation du wheel

La meilleure option est d'installer avec Python wheels comme suit :

1. Chargez un module Python avec module load python.
2. Créez et démarrez un environnement virtuel.
3. Installez PyTorch dans l'environnement virtuel avec pip install.

GPU et CPU

Question.png
(venv) [name@server ~] pip install --no-index torch

Remarque : PyTorch 1.10 cause des problèmes connus sur nos grappes (à l'exception de Narval). Si l'entraînement distribué produit des erreurs ou si vous obtenez une erreur qui inclut c10::Error, nous vous recommandons d'installer PyTorch 1.9.1 avec pip install --no-index torch==1.9.1.

En supplément

En plus de torch, vous pouvez aussi installer torchvision, torchtext et torchaudio.

Question.png
(venv) [name@server ~] pip install --no-index torch torchvision torchtext torchaudio

Soumettre une tâche

Le script suivant est un exemple de soumission d'une tâche utilisant le wheel Python avec un environnement virtuel.


File : pytorch-test.sh

#!/bin/bash
#SBATCH --gres=gpu:1       # Request GPU "generic resources"
#SBATCH --cpus-per-task=6  # Cores proportional to GPUs: 6 on Cedar, 16 on Graham.
#SBATCH --mem=32000M       # Memory proportional to GPUs: 32000 Cedar, 64000 Graham.
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out

module load python/<select version> # Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch --no-index

python pytorch-test.py


Le script Python pytorch-ddp-test.py a la forme suivante :


File : pytorch-test.py

import torch
x = torch.Tensor(5, 3)
print(x)
y = torch.rand(5, 3)
print(y)
# let us run the following only if CUDA is available
if torch.cuda.is_available():
    x = x.cuda()
    y = y.cuda()
    print(x + y)


Vous pouvez alors soumettre une tâche PyTorch avec

Question.png
[name@server ~]$ sbatch pytorch-test.sh

Haute performance

TF32 : Performance vs précision

Avec sa version 1.7.0, PyTorch a ajouté le support pour le mode TensorFloat-32 (TF32) de Nvidia et est seulement disponible pour les architectures GPU d'Ampere et de Nvidia. Avec ce mode qui est offert par défaut dans les versions 1.7.x à 1.11.x, les opérations tensorielles se font jusqu'à 20x plus rapidement que les opérations équivalentes en simple précision (FP32). Cependant, ce gain en performance peut engendrer une baisse dans la précision du résultat des opérations, ce qui pose problème avec les modèles d'apprentissage profond qui utilisent à l'occasion des matrices mal conditionnées ou qui effectuent de longues séquences d'opérations tensorielles. Suite aux commentaires de la communauté des utilisateurs, TF32 est désactivé par défaut pour les multiplications matricielle et activé par défaut pour les convolutions à partir de la version 1.12.0.

En date d'octobre 2022, notre seule grappe qui offre des GPU Ampere est Narval. Quand vous utilisez PyTorch sur Narval,

  • Vous pourriez remarquer un fort ralentissement dans l'exécution sur GPU du même code avec torch < 1.12.0 et torch >= 1.12.0.
  • Vous pourriez obtenir des résultats différents dans l'exécution sur GPU du même code avec torch < 1.12.0 et torch >= 1.12.0.

Pour activer ou désactiver TF32 pour torch >= 1.12.0, donnez la valeur True ou False aux indicateurs suivants :

torch.backends.cuda.matmul.allow_tf32 = False # Enable/disable TF32 for matrix multiplications
torch.backends.cudnn.allow_tf32 = False # Enable/disable TF32 for convolutions

Pour plus d'information, consultez ce paragraphe de la documentation PyTorch.

Travailler avec plusieurs CPU

Par défaut, PyTorch permet le parallélisme avec plusieurs CPU de deux façons :

  • intra-op, par l’implémentation parallèle d’opérateurs souvent utilisés en apprentissage profond comme le produit matriciel ou le produit de convolution, en utilisant OpenMP directement ou avec des bibliothèques de bas niveau comme MKL et OneDNN. Quand du code PyTorch doit effectuer de telles opérations, elles utilisent automatiquement de multiples fils avec tous les cœurs CPU disponibles.
  • inter-op, par la capacité d’exécuter différentes parties de code de manière concurrente. Ce mode de parallélisme nécessite habituellement que le programme soit conçu de manière à exécuter plusieurs parties en parallèle, par exemple en faisant usage du compilateur en temps réel torch.jit pour exécuter des tâches asynchrones dans un programme TorchScript.

Pour les petits modèles, nous recommandons fortement d’utiliser plusieurs CPU plutôt qu’un GPU. L’entraînement sera certainement plus rapide avec un GPU (sauf dans les cas de très petits modèles), mais si le modèle et le jeu de données ne sont pas assez grands, la vitesse gagnée avec le GPU ne sera probablement pas très importante et la tâche n’utilisera qu’une petite part de la capacité de calcul. Ce n’est peut-être pas grave sur votre propre ordinateur, mais dans un environnement partagé comme sur nos grappes, vous bloqueriez une ressource qui pourrait servir à effectuer de calculs de grande échelle par un autre projet. De plus, l’utilisation d’un GPU contribuerait à la diminution de l’allocation de votre groupe et aurait une incidence sur la priorité accordée aux tâches de vos collègues.

Dans le code suivant, il y a plusieurs occasions d’utiliser le parallélisme intra-op. En demandant plus de CPU et sans changer le code, on peut constater l’effet sur la performance.


File : pytorch-multi-cpu.sh

#!/bin/bash
#SBATCH --nodes 1
#SBATCH --tasks-per-node=1 
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... to see the effect on performance

#SBATCH --mem=8G      
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index

echo "starting training..."

time python cifar10-cpu.py



File : cifar10-cpu.py

import numpy as np
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import argparse
import os

parser = argparse.ArgumentParser(description='cifar10 classification models, cpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')

def main():

    args = parser.parse_args()
    torch.set_num_threads(int(os.environ['SLURM_CPUS_PER_TASK']))
    class Net(nn.Module):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

    net = Net()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=args.lr)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    ### This next line will attempt to download the CIFAR10 dataset from the internet if you don't already have it stored in ./data 
    ### Run this line on a login node with "download=True" prior to submitting your job, or manually download the data from 
    ### https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and place it under ./data

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    perf = []

    total_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()

       outputs = net(inputs)
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       batch_time = time.time() - start

       images_per_sec = args.batch_size/batch_time

       perf.append(images_per_sec)

    total_time = time.time() - total_start

if __name__=='__main__':
   main()


Travailler avec un seul GPU

On entend souvent dire qu’il faut absolument entraîner un modèle avec un GPU s’il y en a un à notre disposition. Ceci est presque toujours vrai (l'entraînement de très petits modèles est souvent plus rapide avec un ou plusieurs CPU) sur un poste de travail local, mais ce n’est pas le cas sur nos grappes.

Autrement dit, vous ne devriez pas demander un GPU si votre code ne peut pas faire un usage raisonnable de sa capacité de calcul.

La performance avantageuse des GPU pour les tâches d’apprentissage profond provient de deux sources :

  1. La capacité de paralléliser l’exécution de certaines opérations clés, par exemple le multiplieur-accumulateur, sur plusieurs milliers de cœurs de calcul, en comparaison du très petit nombre de cœurs disponibles avec la plupart des CPU.
  2. Une bande passante de mémoire beaucoup plus grande que pour un CPU, ce qui permet aux GPU d’utiliser efficacement leur très grand nombre de cœurs pour traiter une plus grande quantité de données par cycle de calcul.

Comme c’est le cas avec plusieurs CPU, PyTorch offre des implémentations parallèles d’opérateurs souvent utilisés en apprentissage profond, comme le produit matriciel et le produit de convolution et utilise des bibliothèques spécialisées pour les GPU comme CUDNN ou MIOpen, selon la plateforme matérielle. Ceci signifie que pour qu’il vaille la peine d’utiliser un GPU pour une tâche d’apprentissage, elle doit être composée d’éléments qui peuvent être élargis à une application massive du parallélisme de par le nombre d’opérations pouvant être parallélisées, de par la quantité des données à traiter ou idéalement de par les deux. Un exemple concret serait un grand modèle qui a un grand nombre d’unités et de couches ou qui a beaucoup de données en entrée, et idéalement qui présente ces deux caractéristiques.

Dans l’exemple ci-dessous, nous adaptons le code de la section précédente pour utiliser un GPU et nous examinons la performance. Nous observons que deux paramètres jouent un rôle important : batch_size et num_workers. Le premier paramètre améliore la performance en augmentant la taille des entrées à chaque itération et en utilisant mieux la capacité du GPU. Dans le cas du second paramètre, la performance est améliorée en facilitant le mouvement des données entre la mémoire de l’hôte (le CPU) et la mémoire du GPU, ce qui réduit la durée d’inactivité du GPU en attente de données à traiter.

Nous pouvons tirer deux conclusions :

  1. Augmenter la valeur de batch_size au maximum qu’il est possible pour la mémoire du GPU optimise la performance.
  2. Utiliser un DataLoader avec autant de workers que cpus-per-task facilite l’apport de données au GPU.

Bien entendu, le paramètre batch_size a aussi un impact sur la performance d’un modèle dans une tâche(c.à.d. l’exactitude, l’erreur, etc.) et il existe différentes écoles de pensée sur l’utilisation de grands lots. Nous n’abordons pas le sujet ici, mais si vous croyez qu’un petit lot conviendrait mieux à votre application, allez à la section Travailler avec un seul GPU pour savoir comment maximiser l’utilisation du GPU avec de petites entrées de données.


File : pytorch-single-gpu.sh

#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=1 
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G      
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index

echo "starting training..."
time python cifar10-gpu.py --batch_size=512 --num_workers=0



File : cifar10-gpu.py

import numpy as np
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, single gpu performance test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():

    args = parser.parse_args()

    class Net(nn.Module):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

    net = Net().cuda() # Load model on the GPU

    criterion = nn.CrossEntropyLoss().cuda() # Load the loss function on the GPU
    optimizer = optim.SGD(net.parameters(), lr=args.lr)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    perf = []

    total_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()
       
       inputs = inputs.cuda() 
       targets = targets.cuda()

       outputs = net(inputs)
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       batch_time = time.time() - start

       images_per_sec = args.batch_size/batch_time

       perf.append(images_per_sec)

    total_time = time.time() - total_start

if __name__=='__main__':
   main()


Parallélisme des données avec un seul GPU

Il n’est pas conseillé d’utiliser un GPU avec un modèle de taille relativement petite qui n’utilise pas une grande part de la mémoire du GPU et une part raisonnable de sa capacité de calcul; utilisez plutôt un ou plusieurs CPU. Par contre, profiter du parallélisme du GPU devient une bonne option si vous avez un tel modèle avec un très grand jeu de données et que vous voulez effectuer l’entraînement avec des lots de petite taille.

Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit un morceau des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges.

PyTorch offre des implémentations de méthodes de parallélisme des données, la classe DistributedDataParallel étant celle recommandée par les développeurs de PyTorch pour donner la meilleure performance. Conçue pour le travail avec plusieurs GPU, elle peut aussi être employée avec un seul GPU.

Dans l’exemple ci-dessous, nous adaptons le code pour un seul GPU pour utiliser le parallélisme des données. La tâche est relativement petite; la taille du lot est de 512 images, le modèle occupe environ 1Go de la mémoire du GPU et l’entraînement n’utilise qu’environ 6 % de sa capacité de calcul. Ce modèle ne devrait pas être entraîné sur un GPU sur nos grappes. Cependant, en parallélisant les données, un GPU V100 avec 16Go de mémoire peut contenir 14 ou 15 copies du modèle et augmenter l'utilisation de la ressource en plus d’obtenir une bonne augmentation de vitesse. Nous utilisons Multi-Process Service (MPS) de NVIDIA avec MPI pour placer plusieurs copies du modèle sur un GPU de façon efficace.


File : pytorch-gpu-mps.sh

#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:1 # request a GPU
#SBATCH --tasks-per-node=8 # This is the number of model replicas we will place on the GPU. Change this to 10,12,14,... to see the effect on performance  
#SBATCH --cpus-per-task=1 # increase this parameter and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G      
#SBATCH --time=0:05:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index

# Activate Nvidia MPS:
export CUDA_MPS_PIPE_DIRECTORY=/tmp/nvidia-mps
export CUDA_MPS_LOG_DIRECTORY=/tmp/nvidia-log
nvidia-cuda-mps-control -d

echo "starting training..."
time srun --cpus-per-task=$SLURM_CPUS_PER_TASK python cifar10-gpu-mps.py --batch_size=512 --num_workers=0



File : cifar10-gpu-mps.py

import os
import time
import datetime
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import torch.distributed as dist
import torch.utils.data.distributed

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel maps test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')
parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')

def main():
    print("Starting...")

    args = parser.parse_args()

    rank = os.environ.get("SLURM_LOCALID")

    current_device = 0
    torch.cuda.set_device(current_device)

    """ this block initializes a process group and initiate communications
                between all processes that will run a model replica """

    print('From Rank: {}, ==> Initializing Process Group...'.format(rank))

    dist.init_process_group(backend="mpi", init_method=args.init_method) # Use backend="mpi" or "gloo". NCCL does not work on a single GPU due to a hard-coded multi-GPU topology check.
    print("process group ready!")

    print('From Rank: {}, ==> Making model..'.format(rank))

    class Net(nn.Module):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

    net = Net()

    net.cuda()
    net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device]) # Wrap the model with DistributedDataParallel

    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr)

    print('From Rank: {}, ==> Preparing data..'.format(rank))

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='~/data', train=True, download=False, transform=transform_train)

    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)

    perf = []

    total_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()
       
       inputs = inputs.cuda() 
       targets = targets.cuda()

       outputs = net(inputs)
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       batch_time = time.time() - start

       images_per_sec = args.batch_size/batch_time

       perf.append(images_per_sec)

    total_time = time.time() - total_start

if __name__=='__main__':
   main()


Travailler avec plusieurs GPU

Problème avec DistributedDataParallel et PyTorch 1.10

Avec notre wheel PyTorch 1.10 torch-1.10.0+computecanada, le code pour travailler avec plusieurs GPU et qui utilise DistributedDataParallel pourrait échouer de façon imprévisible si le backend est défini comme étant 'nccl' ou 'gloo'. Nous vous recommandons d'utiliser la version la plus récente de PyTorch plutôt que la version 1.10 sur toutes les grappes d'usage général.

Paralléliser les données avec plusieurs GPU

Dans ce contexte, la parallélisation des données réfère à des méthodes pour entraîner en parallèle plusieurs copies d’un modèle où chaque copie reçoit une portion des données d’entraînement à chaque itération. À la fin d’une itération, les gradients sont agrégés et les paramètres de chaque copie sont mis à jour de façon synchrone ou asynchrone, dépendant de la méthode. Cette approche peut augmenter la vitesse d’exécution de façon importante avec une itération qui se fait environ N fois plus rapidement avec un grand jeu de données, N étant le nombre de copies du modèle. Pour utiliser cette approche, un avertissement s’impose : pour que le modèle entraîné soit équivalent au même modèle entraîné sans parallélisme, vous devez adapter le taux d’apprentissage ou la taille du lot désirée en fonction du nombre de copies. Pour plus d’information, voyez ces échanges. Quand plusieurs GPU sont utilisés, chacun reçoit une copie du modèle; il doit donc être assez petit pour être contenu dans la mémoire d’un GPU. Pour entraîner un modèle qui dépasse la quantité de mémoire d’un GPU, voyez la section Paralléliser un modèle avec plusieurs GPU.

Il existe plusieurs manières de paralléliser les données avec PyTorch. Nous présentons ici des tutoriels avec la classe DistributedDataParallel, avec le paquet PyTorch Lightning et avec le paquet Horovod.

DistributedDataParallel

Avec plusieurs GPU, la classe DistributedDataParallel est recommandée par les développeurs PyTorch, que ce soit avec un nœud unique ou avec plusieurs nœuds. Dans le cas qui suit, plusieurs GPU sont distribués sur deux nœuds.


File : pytorch-ddp-test.sh

#!/bin/bash
#SBATCH --nodes 1             
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2   # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G      
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out

module load python # Using Default Python version - Make sure to choose a version that suits your application
srun -N $SLURM_NNODES -n $SLURM_NNODES bash << EOF
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision --no-index
EOF

export TORCH_NCCL_ASYNC_HANDLING=1
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.

echo "r$SLURM_NODEID master: $MASTER_ADDR"
echo "r$SLURM_NODEID Launching python script"

# The $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES)) variable tells the script how many processes are available for this execution. “srun” executes the script <tasks-per-node * nodes> times

source $SLURM_TMPDIR/env/bin/activate

srun python pytorch-ddp-test.py --init_method tcp://$MASTER_ADDR:3456 --world_size $((SLURM_NTASKS_PER_NODE * SLURM_JOB_NUM_NODES))  --batch_size 256


Le script Python pytorch-ddp-test.py a la forme suivante :


File : pytorch-ddp-test.py

import os
import time
import datetime

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import torch.distributed as dist
import torch.utils.data.distributed

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')

parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='gloo', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')

def main():
    print("Starting...")

    args = parser.parse_args()

    ngpus_per_node = torch.cuda.device_count()

    """ This next line is the key to getting DistributedDataParallel working on SLURM:
		SLURM_NODEID is 0 or 1 in this example, SLURM_LOCALID is the id of the 
 		current process inside a node and is also 0 or 1 in this example."""

    local_rank = int(os.environ.get("SLURM_LOCALID")) 
    rank = int(os.environ.get("SLURM_NODEID"))*ngpus_per_node + local_rank

    current_device = local_rank

    torch.cuda.set_device(current_device)

    """ this block initializes a process group and initiate communications
		between all processes running on all nodes """

    print('From Rank: {}, ==> Initializing Process Group...'.format(rank))
    #init the process group
    dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank)
    print("process group ready!")

    print('From Rank: {}, ==> Making model..'.format(rank))

    class Net(nn.Module):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

    net = Net()

    net.cuda()
    net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[current_device])

    print('From Rank: {}, ==> Preparing data..'.format(rank))

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)

    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)

    for epoch in range(args.max_epochs):

        train_sampler.set_epoch(epoch)

        train(epoch, net, criterion, optimizer, train_loader, rank)

def train(epoch, net, criterion, optimizer, train_loader, train_rank):

    train_loss = 0
    correct = 0
    total = 0

    epoch_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()

       inputs = inputs.cuda()
       targets = targets.cuda()
       outputs = net(inputs)
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       train_loss += loss.item()
       _, predicted = outputs.max(1)
       total += targets.size(0)
       correct += predicted.eq(targets).sum().item()
       acc = 100 * correct / total

       batch_time = time.time() - start

       elapse_time = time.time() - epoch_start
       elapse_time = datetime.timedelta(seconds=elapse_time)
       print("From Rank: {}, Training time {}".format(train_rank, elapse_time))

if __name__=='__main__':
   main()


PyTorch Lightning

Ce paquet fournit des interfaces à PyTorch afin de simplifier plusieurs tâches communes exigeant beaucoup de code; ceci inclut les tâches d'entraînement de modèles avec plusieurs GPU. Dans le tutoriel suivant pour PyTorch Lightning, nous reprenons le même exemple que ci-dessus, mais sans avoir explicitement recours à la classe DistributedDataParallel.


File : pytorch-ddp-test-pl.sh

#!/bin/bash
#SBATCH --nodes 1             
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=8G      
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning --no-index

export TORCH_NCCL_ASYNC_HANDLING=1

# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!

srun python pytorch-ddp-test-pl.py  --batch_size 256



File : pytorch-ddp-test-pl.py

import datetime

import torch
from torch import nn
import torch.nn.functional as F

import pytorch_lightning as pl

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, pytorch-lightning parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():
    print("Starting...")

    args = parser.parse_args()

    class Net(pl.LightningModule):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

       def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss

       def configure_optimizers(self):
          return torch.optim.Adam(self.parameters(), lr=args.lr)

    net = Net()

    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPUs per node.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. 
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, 
        which can cause issues due to updating logs too frequently."""

    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy='ddp', max_epochs = args.max_epochs, enable_progress_bar=False) 

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    trainer.fit(net,train_loader)


if __name__=='__main__':
   main()


Horovod

Horovod est une plateforme d'entraînement distribué pour l'apprentissage profond, compatible avec TensorFlow, Keras, PyTorch et Apache MXNet. Son API vous permet d'avoir le même niveau de contrôle sur votre code d'entraînement qu'avec DistributedDataParallel, mais simplifie l'écriture de vos scripts en éliminant le besoin de configurer directement les groupes de processus et en prenant en charge les variables d'environnement de l'ordonnanceur. Horovod offre aussi des optimiseurs distribués qui dans certains cas améliorent la performance. L'exemple suivant est le même que le précédent, cette fois avec Horovod.


File : pytorch_horovod.sh

#!/bin/bash
#SBATCH --nodes 1            
#SBATCH --gres=gpu:2         # Request 2 GPU "generic resources”.

#SBATCH --tasks-per-node=2   # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.

#SBATCH --mem=8G      
#SBATCH --time=0-03:00
#SBATCH --output=%N-%j.out

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision horovod --no-index

export TORCH_NCCL_ASYNC_HANDLING=1

srun python pytorch_horovod.py  --batch_size 256



File : pytorch_horovod.py

import os
import time
import datetime
import numpy as np
import horovod.torch as hvd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import torch.distributed as dist
import torch.utils.data.distributed

import argparse


parser = argparse.ArgumentParser(description='cifar10 classification models, horovod test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--max_epochs', type=int, default=1, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():

    args = parser.parse_args()

    hvd.init()

    print("Starting...")

    local_rank = hvd.local_rank()
    global_rank = hvd.rank()

    torch.cuda.set_device(local_rank)


    class Net(nn.Module):

       def __init__(self):
          super(Net, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)

       def forward(self, x):
          x = self.pool(F.relu(self.conv1(x)))
          x = self.pool(F.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)
          x = F.relu(self.fc1(x))
          x = F.relu(self.fc2(x))
          x = self.fc3(x)
          return x

    net = Net()

    net.cuda()

    print('From Rank: {}, ==> Preparing data..'.format(global_rank))

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, num_replicas=hvd.size(),rank=global_rank)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)


    criterion = nn.CrossEntropyLoss().cuda()
    optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9, weight_decay=1e-4)

    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())

    hvd.broadcast_parameters(net.state_dict(), root_rank=0)

    for epoch in range(args.max_epochs):

        train_sampler.set_epoch(epoch)

        train(args,epoch, net, criterion, optimizer, train_loader, global_rank)


def train(args,epoch, net, criterion, optimizer, train_loader, train_rank):

    train_loss = 0
    correct = 0
    total = 0

    epoch_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()

       inputs = inputs.cuda()
       targets = targets.cuda()
       outputs = net(inputs)
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()

       train_loss += loss.item()
       _, predicted = outputs.max(1)
       total += targets.size(0)
       correct += predicted.eq(targets).sum().item()
       acc = 100 * correct / total

       batch_time = time.time() - start

       elapse_time = time.time() - epoch_start
       elapse_time = datetime.timedelta(seconds=elapse_time)
       print("From Rank: {}, Training time {}".format(train_rank, elapse_time))

if __name__=='__main__':
   main()


Paralléliser un modèle avec plusieurs GPU

Quand un modèle est trop grand pour être contenu dans un seul GPU, vous pouvez le diviser en portions et charger chacune sur des GPU distincts. Dans l’exemple suivant, nous reprenons le code des sections précédentes pour illustrer le fonctionnement. Nous divisons un réseau de neurones convolutifs en deux : d’une part les couches convolutionnelles/de regroupement et d'autre part les couches acycliques entièrement connectées. La tâche demande deux GPU, chacun d’eux étant chargé avec une des parts. De plus, nous ajoutons du code pour paralléliser les pipelines et ainsi réduire au maximum le temps pendant lequel le deuxième GPU est inactif dans l’attente des résultats du premier. Pour ce faire, nous créons un nn.Module distinct pour chacune des portions du modèle, puis créons une séquence de modules en enveloppant les portions avec nn.Sequential, et ensuite utilisons torch.distributed.pipeline.sync.Pipe pour morceler chacun des lots en entrée et les passer en parallèle aux deux portions du modèle.


File : pytorch-modelpar-pipelined-rpc.sh

#!/bin/bash
#SBATCH --nodes 1
#SBATCH --gres=gpu:2 # request 2 GPUs
#SBATCH --tasks-per-node=1 
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=8G      
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python # Using Default Python version - Make sure to choose a version that suits your application
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index

# This is needed to initialize pytorch's RPC module, required for the Pipe class which we'll use for Pipeline Parallelism
export MASTER_ADDR=$(hostname)
export MASTER_PORT=34567
 
echo "starting training..."
time python pytorch-modelpar-pipelined-rpc.py --batch_size=512 --num_workers=0



File : pytorch-modelpar-pipelined-rpc.py

import time

import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, single node model parallelism test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=512, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():

    args = parser.parse_args()

    # Convolutional + pooling part of the model
    class ConvPart(nn.Module):

       def __init__(self):
          super(ConvPart, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)

          return x

    # Dense feedforward part of the model
    class MLPPart(nn.Module):

       def __init__(self):
          super(MLPPart, self).__init__()

          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)

          return x

    torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # initializing RPC is required by Pipe we use below

    part1 = ConvPart().to('cuda:0') # Load part1 on the first GPU
    part2 = MLPPart().to('cuda:1') # Load part2 on the second GPU

    net = nn.Sequential(part1,part2) # Pipe requires all modules be wrapped with nn.Sequential()

    net = Pipe(net, chunks=32) # Wrap with Pipe to perform Pipeline Parallelism

    criterion = nn.CrossEntropyLoss().to('cuda:1') # Load the loss function on the last GPU
    optimizer = optim.SGD(net.parameters(), lr=args.lr)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    perf = []

    total_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

       start = time.time()

       inputs = inputs.to('cuda:0')
       targets = targets.to('cuda:1')

       # Models wrapped with Pipe() return a RRef object. Since the example is single node, all values are local to the node and we can grab them
       outputs = net(inputs).local_value()
       loss = criterion(outputs, targets)

       optimizer.zero_grad()
       loss.backward()
       optimizer.step()
       print(f"Loss: {loss.item()}")

       batch_time = time.time() - start

       images_per_sec = args.batch_size/batch_time

       perf.append(images_per_sec)

    total_time = time.time() - total_start

if __name__=='__main__':
   main()


Paralléliser modèle et données avec plusieurs GPU

Quand un modèle est trop grand pour être contenu dans un seul GPU et que son entraînement doit se faire avec un très grand ensemble de données, le fait de combiner le parallélisme du modèle et celui des données permet d’obtenir une bonne performance. Le principe est simple : le modèle est divisé en portions chacune attribuée à un GPU; le parallélisme des pipelines est fait avec les résultats; puis des copies du processus sont faites et les copies du modèle sont entraînées en parallèle avec des sous-ensembles distincts de l’ensemble de données d’entraînement. Comme décrit ci-dessus, les gradients sont calculés indépendamment dans chacune des copies et agrégés pour modifier toutes les copies de façon synchrone ou asynchrone, dépendant de la méthode. La différence principale ici est que chaque copie du modèle se trouve sur plus d’un GPU.

Utiliser Torch RPC et DDP

Toujours avec le même exemple, nous combinons maintenant Torch RPC et DistributedDataParallel pour séparer le modèle en deux portions et entraîner quatre copies du modèle distribuées en parallèle sur deux nœuds. Autrement dit, nous avons deux copies sur deux GPU de chaque nœud. Cependant, un avertissement s’impose : à ce jour, Torch RPC prend en charge la division d’un modèle dans un seul nœud. Pour entraîner un modèle qui dépasse la quantité de mémoire de tous les GPU dans un nœud de calcul, voyez la section DeepSpeed.


File : pytorch-model-data-par.sh

#!/bin/bash
#SBATCH --nodes 2
#SBATCH --gres=gpu:4 # Request 4 GPUs per node
#SBATCH --tasks-per-node=2 # Request one task per MODEL per node
#SBATCH --cpus-per-task=1 # change this parameter to 2,4,6,... and increase "--num_workers" accordingly to see the effect on performance
#SBATCH --mem=16G      
#SBATCH --time=0:10:00
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load StdEnv/2020 gcc/11.3.0
module load python # Using Default Python version - Make sure to choose a version that suits your application, python/3.10.2 works with this demo
module load cuda/11.8.0
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torch torchvision --no-index

export MAIN_NODE=$(hostname)

echo "starting training..."

srun python pytorch-model-data-par.py --init_method tcp://$MAIN_NODE:3456 --world_size $SLURM_NTASKS  --batch_size 512



File : pytorch-model-data-par.py

import time
import os

import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributed.pipeline.sync import Pipe

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

import torch.distributed as dist
import torch.utils.data.distributed

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, distributed data & model parallel test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--max_epochs', type=int, default=4, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')

parser.add_argument('--init_method', default='tcp://127.0.0.1:3456', type=str, help='')
parser.add_argument('--dist-backend', default='mpi', type=str, help='')
parser.add_argument('--world_size', default=1, type=int, help='')
parser.add_argument('--distributed', action='store_true', help='')


def main():

    args = parser.parse_args()

    # Convolutional + pooling part of the model
    class ConvPart(nn.Module):

       def __init__(self):
          super(ConvPart, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)

          return x

    # Dense feedforward part of the model
    class MLPPart(nn.Module):

       def __init__(self):
          super(MLPPart, self).__init__()

          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)

          return x

    ngpus_per_node = torch.cuda.device_count()
    local_rank = int(os.environ.get("SLURM_LOCALID"))
    rank = int(os.environ.get("SLURM_NODEID"))*(ngpus_per_node//2) + local_rank  # Divide ngpus_per_node by the number of model parts

    os.environ['MASTER_ADDR'] = '127.0.0.1' # Each model replica will run its own RPC server to run pipeline parallelism
    os.environ['MASTER_PORT'] = str(34567 + local_rank) # Make sure each RPC server starts on a different port
    torch.distributed.rpc.init_rpc('worker', rank=0, world_size=1) # Different replicas won't communicate through RPC, but through DDP

    dist.init_process_group(backend=args.dist_backend, init_method=args.init_method, world_size=args.world_size, rank=rank) # Initialize Data Parallelism communications

    part1 = ConvPart().cuda(local_rank) # First part of the model goes on the first GPU of each process
    part2 = MLPPart().cuda(local_rank + 1) # Second part goes on the second GPU of each process

    net = nn.Sequential(part1,part2)

    net = Pipe(net, chunks=32, checkpoint="never")

    net = torch.nn.parallel.DistributedDataParallel(net)

    criterion = nn.CrossEntropyLoss().cuda(local_rank + 1) # Loss function goes on the second GPU of each process
    optimizer = optim.SGD(net.parameters(), lr=args.lr)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.num_workers, sampler=train_sampler)


    for epoch in range(args.max_epochs):

        train_sampler.set_epoch(epoch)

        train(epoch, net, criterion, optimizer, train_loader, rank, local_rank)

def train(epoch, net, criterion, optimizer, train_loader, train_rank, model_rank):

    train_loss = 0
    correct = 0
    total = 0

    epoch_start = time.time()

    for batch_idx, (inputs, targets) in enumerate(train_loader):

        start = time.time()

        inputs = inputs.cuda(model_rank)
        targets = targets.cuda(model_rank + 1)

        outputs = net(inputs).local_value()
        loss = criterion(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(f"From Rank {train_rank} - Loss: {loss.item()}")

        batch_time = time.time() - start

if __name__=='__main__':
   main()


DeepSpeed

DeepSpeed est une bibliothèque qui permet d'optimiser l’entraînement de l’apprentissage de modèles ayant des milliards de paramètres à l’échelle. Parfaitement compatible avec PyTorch, DeepSpeed offre des implémentations de nouvelles méthodes d’entraînement distribué qui font un usage efficace de la mémoire en appliquant le concept de Zero Redundancy Optimizer (ZeRO). Avec ZeRO, DeepSpeed peut distribuer le stockage et le traitement des divers éléments d’une tâche d’entraînement (états de l’optimiseur, poids du modèle, gradients et activations) sur plusieurs dispositifs comme les GPU, les CPU, les disques durs locaux et/ou les combinaisons de ces dispositifs. Cette mise en commun des ressources, surtout les ressources de stockage, permet l’entraînement efficace sur plusieurs nœuds de modèles ayant d’énormes quantités de paramètres sans avoir besoin d’écrire le code pour paralléliser le modèle, les pipelines ou les données. Les exemples qui suivent montrent comment profiter de DeepSpeed et des différentes implémentations de ZeRO en utilisant l’interface simple de PyTorch Lightning.

ZeRO avec GPU

Dans l’exemple ci-dessous, nous utilisons ZeRO stage 3 pour entraîner un modèle qui utilise un groupe de 4 GPU. Le stage 3 répartit sur les 4 GPU les trois caractéristiques, soit les états de l’optimiseur, les paramètres du modèle et les gradients du modèle. Ceci est plus efficace que le parallélisme pur des données où une copie complète du modèle est chargée sur chacun des GPU. Avec l’optimiseur DeepSpeed FusedAdam plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.


File : deepspeed-stage3.sh

#!/bin/bash
#SBATCH --nodes 1             
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G      
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python cuda # CUDA must be loaded if using a DeepSpeed optimizer
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index

export TORCH_NCCL_ASYNC_HANDLING=1

# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!

srun python deepspeed-stage3.py  --batch_size 256



File : deepspeed-stage3.py

import torch
from torch import nn
import torch.nn.functional as F

import pytorch_lightning as pl

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

from deepspeed.ops.adam import FusedAdam
from pytorch_lightning.strategies import DeepSpeedStrategy

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models deep seed stage 3 test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():
    print("Starting...")

    args = parser.parse_args()

    class ConvPart(nn.Module):

       def __init__(self):
          super(ConvPart, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)

          return x

    # Dense feedforward part of the model
    class MLPPart(nn.Module):

       def __init__(self):
          super(MLPPart, self).__init__()

          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)

          return x

    class Net(pl.LightningModule):

       def __init__(self):
          super(Net, self).__init__()

          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()

       def configure_sharded_model(self):

          self.block = nn.Sequential(self.conv_part, self.mlp_part)

       def forward(self, x):
          x = self.block(x)

          return x

       def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss

       def configure_optimizers(self):
          return FusedAdam(self.parameters())

    net = Net()

    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. 
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, 
        which can cause issues due to updating logs too frequently."""

    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy="deepspeed_stage_3", max_epochs = args.max_epochs)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    trainer.fit(net,train_loader)


if __name__=='__main__':
   main()


ZeRO avec CPU

Dans cet autre exemple, nous utilisons aussi ZeRO stage 3, mais cette fois-ci nous utilisons le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que la mémoire du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU et de plus, les pas de l’optimiseur seront calculés sur le CPU. Pour des raisons pratiques, ce serait comme ajouter 32Go de mémoire additionnelle à la mémoire du GPU. Comme la mémoire du GPU est moins sollicitée, vous pouvez par exemple augmenter la taille de vos lots ou de votre modèle. Avec l’optimiseur DeepSpeed DeepSpeedCPUAdam plutôt qu’un optimiseur natif de PyTorch, la performance se compare au parallélisme pur des données. Les optimiseurs DeepSpeed sont compilés en temps réel à l’exécution et vous devez charger un module cuda/<version>version correspond à la version utilisée pour construire le paquet PyTorch que vous utilisez.


File : deepspeed-stage3-offload-cpu.sh

#!/bin/bash
#SBATCH --nodes 1             
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”.
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G      
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch. 
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index

export TORCH_NCCL_ASYNC_HANDLING=1

# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!

srun python deepspeed-stage3-offload-cpu.py  --batch_size 256



File : deepspeed-stage3-offload-cpu.py

import torch
from torch import nn
import torch.nn.functional as F

import pytorch_lightning as pl

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to cpu test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():
    print("Starting...")

    args = parser.parse_args()

    class ConvPart(nn.Module):

       def __init__(self):
          super(ConvPart, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)

          return x

    # Dense feedforward part of the model
    class MLPPart(nn.Module):

       def __init__(self):
          super(MLPPart, self).__init__()

          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)

          return x

    class Net(pl.LightningModule):

       def __init__(self):
          super(Net, self).__init__()

          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()

       def configure_sharded_model(self):

          self.block = nn.Sequential(self.conv_part, self.mlp_part)

       def forward(self, x):
          x = self.block(x)

          return x

       def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss

       def configure_optimizers(self):
          return DeepSpeedCPUAdam(self.parameters())

    net = Net()

    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. 
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, 
        which can cause issues due to updating logs too frequently."""

    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
        stage=3,
        offload_optimizer=True,
        offload_parameters=True,
        ), max_epochs = args.max_epochs)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    trainer.fit(net,train_loader)


if __name__=='__main__':
   main()


ZeRO avec utilisation de disques NVMe

ZeRO stage 3 nous sert encore, cette fois-ci en utilisant le CPU pour les états de l’optimiseur et les paramètres du modèle. Ceci signifie que l’espace de stockage local du nœud de calcul sera disponible pour stocker ces tenseurs quand ils ne sont pas requis par les calculs effectués par le GPU. Ici encore, les pas de l’optimiseur seront calculés sur le CPU. De même, pour des raisons pratiques, ce serait comme ajouter à la mémoire du GPU autant d’espace de stockage que sur le disque local, mais par contre nous aurons une forte perte de performance. Cette approche peut être utilisée avec tous les types de stockage, mais elle est à privilégier avec les disques NVMe qui sont plus rapides et ont des temps de réponse plus courts, ce qui compense pour la baisse de performance.


File : deepspeed-stage3-offload-nvme.sh

#!/bin/bash
#SBATCH --nodes 1             
#SBATCH --gres=gpu:2          # Request 2 GPU "generic resources”. 
#SBATCH --tasks-per-node=2    # Request 1 process per GPU. You will get 1 CPU per process by default. Request more CPUs with the "cpus-per-task" parameter to enable multiple data-loader workers to load data in parallel.
#SBATCH --mem=32G      
#SBATCH --time=0-00:20
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

module load python cuda # CUDA must be loaded if using ZeRO offloading to CPU or NVMe. Version must be the same used to compile PyTorch. 
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install torchvision pytorch-lightning deepspeed --no-index

export TORCH_NCCL_ASYNC_HANDLING=1

# PyTorch Lightning will query the environment to figure out if it is running inside a SLURM batch job
# If it is, it expects the user to have requested one task per GPU.
# If you do not ask for 1 task per GPU, and you do not run your script with "srun", your job will fail!

srun python deepspeed-stage3-offload-nvme.py  --batch_size 256



File : deepspeed-stage3-offload-nvme.py

import os

import torch
from torch import nn
import torch.nn.functional as F

import pytorch_lightning as pl

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

from deepspeed.ops.adam import DeepSpeedCPUAdam
from pytorch_lightning.strategies import DeepSpeedStrategy

import argparse

parser = argparse.ArgumentParser(description='cifar10 classification models, deepspeed offload to nvme test')
parser.add_argument('--lr', default=0.1, help='')
parser.add_argument('--max_epochs', type=int, default=2, help='')
parser.add_argument('--batch_size', type=int, default=768, help='')
parser.add_argument('--num_workers', type=int, default=0, help='')


def main():
    print("Starting...")

    args = parser.parse_args()

    class ConvPart(nn.Module):

       def __init__(self):
          super(ConvPart, self).__init__()

          self.conv1 = nn.Conv2d(3, 6, 5)
          self.pool = nn.MaxPool2d(2, 2)
          self.conv2 = nn.Conv2d(6, 16, 5)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.pool(self.relu(self.conv1(x)))
          x = self.pool(self.relu(self.conv2(x)))
          x = x.view(-1, 16 * 5 * 5)

          return x

    # Dense feedforward part of the model
    class MLPPart(nn.Module):

       def __init__(self):
          super(MLPPart, self).__init__()

          self.fc1 = nn.Linear(16 * 5 * 5, 120)
          self.fc2 = nn.Linear(120, 84)
          self.fc3 = nn.Linear(84, 10)
          self.relu = nn.ReLU()

       def forward(self, x):
          x = self.relu(self.fc1(x))
          x = self.relu(self.fc2(x))
          x = self.fc3(x)

          return x

    class Net(pl.LightningModule):

       def __init__(self):
          super(Net, self).__init__()

          self.conv_part = ConvPart()
          self.mlp_part = MLPPart()

       def configure_sharded_model(self):

          self.block = nn.Sequential(self.conv_part, self.mlp_part)

       def forward(self, x):
          x = self.block(x)

          return x

       def training_step(self, batch, batch_idx):
          x, y = batch
          y_hat = self(x)
          loss = F.cross_entropy(y_hat, y)
          return loss

       def configure_optimizers(self):
          return DeepSpeedCPUAdam(self.parameters())

    net = Net()

    """ Here we initialize a Trainer() explicitly with 1 node and 2 GPU.
        To make this script more generic, you can use torch.cuda.device_count() to set the number of GPUs
        and you can use int(os.environ.get("SLURM_JOB_NUM_NODES")) to set the number of nodes. 
        We also set progress_bar_refresh_rate=0 to avoid writing a progress bar to the logs, 
        which can cause issues due to updating logs too frequently."""

    local_scratch = os.environ['SLURM_TMPDIR'] # Get path where local storage is mounted

    print(f'Offloading to: {local_scratch}')

    trainer = pl.Trainer(accelerator="gpu", devices=2, num_nodes=1, strategy=DeepSpeedStrategy(
        stage=3,
        offload_optimizer=True,
        offload_parameters=True,
        remote_device="nvme",
        offload_params_device="nvme",
        offload_optimizer_device="nvme",
        nvme_path="local_scratch",
        ), max_epochs = args.max_epochs)

    transform_train = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    dataset_train = CIFAR10(root='./data', train=True, download=False, transform=transform_train)

    train_loader = DataLoader(dataset_train, batch_size=args.batch_size, num_workers=args.num_workers)

    trainer.fit(net,train_loader)


if __name__=='__main__':
   main()


Créer des points de contrôle

Peu importe si vous pensez que la durée d'exécution de votre code sera longue ou non, il est bon de prendre l'habitude de créer des points de contrôle pendant l'entraînement. Un point de contrôle est un portrait de votre modèle à un moment précis du processus d'entraînement (après un certain nombre d'itérations ou un certain nombre d'époques) que vous pouvez sauvegarder sur disque et utiliser plus tard. C'est un moyen pratique de diviser les tâches qui devraient être de longue durée en de multiples petites tâches auxquelles l'ordonnanceur peut allouer des ressources plus rapidement. C'est aussi une bonne façon de ne pas perdre le progrès réalisé au cas où des erreurs de code inattendues surviendraient ou que les nœuds ne soient pas disponibles pour quelconque raison.

Avec PyTorch Lightning

Nous recommandons d'utiliser le paramètre de rappels (callbacks parameter) de la classe Trainer(). Dans l'exemple suivant, on demande à PyTorch de créer un point de contrôle à la fin de chacune des époques d'entraînement. Vérifiez que le chemin où créer le point de contrôle existe.

callbacks = [pl.callbacks.ModelCheckpoint(dirpath="./ckpt",every_n_epochs=1)]
trainer = pl.Trainer(callbacks=callbacks) 
trainer.fit(model)

Ce bout de code chargera un point de contrôle de ./ckpt (s'il en existe) et poursuivra l'entraînement à partir de ce point. Pour plus d'information, consultez la documentation PyTorch Lightning.

Avec des boucles d'entraînement personnalisées

Pour des exemples, consultez la documentation PyTorch.

Pendant l’entraînement distribué

Les points de contrôle peuvent être utilisés pendant l’exécution d’un programme d’entraînement distribué. Avec PyTorch Lightning, aucun code supplémentaire n’est requis, autre que d’insérer le paramètre de rappels (callbacks parameter) mentionné ci-dessus. Cependant, si vous utilisez DistributedDataParallel ou Horovod, les points de contrôle devront être créés par un seul processus (rank) de votre programme puisque tous les processus auront le même état après chaque itération. Dans cet exemple, le premier processus (rank 0) crée un point de contrôle.

if global_rank == 0:
       torch.save(ddp_model.state_dict(), "./checkpoint_path")

Faites attention aux points de contrôle ainsi créés. Si un processus tente de charger un point de contrôle qui n’a pas encore été sauvegardé par un autre, des erreurs peuvent survenir ou de mauvais résultats peuvent être produits. Pour éviter ceci, vous pouvez ajouter une barrière à votre code pour faire en sorte que le processus qui crée le point de contrôle a terminé son écriture sur le disque avant que d’autres processus tentent de le charger. Remarquez aussi que torch.load essaiera par défaut de charger les tenseurs sur le GPU sur lequel ils étaient initialement sauvegardés, dans notre cas cuda:0. Pour éviter les problèmes, passez map_location à torch.load pour charger les tenseurs sur le GPU identifié par chaque processus.

torch.distributed.barrier()
map_location = f"cuda:{local_rank}"  
ddp_model.load_state_dict(
torch.load("./checkpoint_path", map_location=map_location))


Dépannage

Fuites de mémoire

Sur le matériel AVX512 (nœuds V100, Skylake ou Béluga), les versions PyTorch antérieures à v1.0.1 qui utilisent des bibliothèques moins récentes (cuDNN < v7.5 ou MAGMA < v2.5) peuvent avoir des fuites de mémoire importantes et créer des exceptions de mémoire insuffisante et terminer vos tâches. Pour contrer ceci, utilisez la plus récente version de torch.

c10::Error

Dans certains cas, vous pouvez obtenir un erreur comme

 terminate called after throwing an instance of 'c10::Error'
   what():  Given groups=1, weight of size [256, 1, 3, 3], expected input[16, 10, 16, 16] to have 1 channels, but got 10 channels instead
 Exception raised from check_shape_forward at /tmp/coulombc/pytorch_build_2021-11-09_14-57-01/avx2/python3.8/pytorch/aten/src/ATen/native/Convolution.cpp:496 (most recent call first):
 ...

Une exception C++ est émise plutôt qu'une exception Python. Ceci peut se produire quand vous programmez en C++ avec libtorch, mais ne devrait pas se produire quand vous programmez en Python. Il n'est pas possible de suivre la trace des appels (traceback)du programme Python, ce qui ne permet pas d'identifier facilement la cause de l'erreur dans le script Python. Nous avons constaté que le fait d'utiliser PyTorch 1.9.1 plutôt que 1.10.x permet le traceback du programme Python.

LibTorch

LibTorch permet d'implémenter à PyTorch des extensions C++ et des applications d'apprentissage machine en C++ pur. La distribution LibTorch possède les en-têtes, bibliothèques et fichiers de configuration CMake nécessaires pour travailler avec PyTorch, tel que décrit dans la documentation.

Utiliser LibTorch

Configurer l'environnement

Chargez les modules requis par LibTorch, puis installez PyTorch dans un environnement virtuel Python.

module load StdEnv/2023 gcc cuda/12.2 cmake protobuf cudnn python/3.11 abseil  cusparselt  opencv/4.8.1
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate 
pip install --no-index torch numpy 

Vous devrez peut-être ajuster les versions des modules abseil, cusparselt et opencv, dépendant du paquet torch que vous utilisez. Pour savoir quelle version d'un module a été utilisée pour compiler le wheel Python, lancez la commande

Question.png
$ ldd $VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib/libtorch_cuda.so | sed -n 's&^.*/\(\(opencv\|abseil\|cusparselt\)/[^/]*\).*&\1&p' | sort -u
abseil/20230125.3
cusparselt/0.5.0.1
opencv/4.8.1
module load gcc cuda/11.4 cmake protobuf cudnn python/3.10
virtualenv --no-download --clear ~/ENV && source ~/ENV/bin/activate 
pip install --no-index torch numpy 

Compiler un exemple simple

Créez les deux fichiers suivants :


File : example.cpp

#include <torch/torch.h>
#include <iostream>

int main() 
{
    torch::Device device(torch::kCPU);
    if (torch::cuda::is_available()) 
    {
        std::cout << "CUDA is available! Using GPU." << std::endl;
        device = torch::Device(torch::kCUDA);
    }

    torch::Tensor tensor = torch::rand({2, 3}).to(device);
    std::cout << tensor << std::endl;
}



File : CMakeLists.txt

cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(example)

find_package(Torch REQUIRED)

add_executable(example example.cpp)
target_link_libraries(example "${TORCH_LIBRARIES}")
set_property(TARGET example PROPERTY CXX_STANDARD 14)


Activez l'environnement virtuel Python, configurez le projet et compilez le programme.

cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.11/site-packages \
                    -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.11/site-packages/torch/lib,-L$EBROOTCUDA/extras/CUPTI/lib64 \
                    -DCMAKE_SKIP_RPATH=ON -DTORCH_CUDA_ARCH_LIST="6.0;7.0;7.5;8.0;9.0"
cmake --build build
cmake -B build -S . -DCMAKE_PREFIX_PATH=$VIRTUAL_ENV/lib/python3.10/site-packages \
                    -DCMAKE_EXE_LINKER_FLAGS=-Wl,-rpath=$VIRTUAL_ENV/lib/python3.10/site-packages/torch/lib \
                    -DCMAKE_SKIP_RPATH=ON
cmake --build build 

Lancez le programme avec

build/example

Pour tester une application avec CUDA, demandez une tâche interactive avec GPU.

Ressources

https://pytorch.org/cppdocs/