Dask

From Alliance Doc
Jump to navigation Jump to search
This page is a translated version of the page Dask and the translation is 100% complete.
Other languages:

Dask est une bibliothèque polyvalente pour Python. Elle fournit des tableaux NumPy et des DataFrame Pandas permettant le calcul distribué en Python pur avec accès à la pile PyData.

Installer le wheel

La meilleure option est d'installer avec Python wheels comme suit :

1. Chargez un module Python avec module load python.
2. Créez et démarrez un environnement virtuel.
3. Dans l'environnement virtuel, utilisez pip install pour installer dask et en option dask-distributed.
Question.png
(venv) [name@server ~] pip install --no-index dask distributed

Soumettre une tâche

Nœud simple

L’exemple suivant démarre une grappe Dask avec un nœud simple de 6 CPU et calcule la moyenne d’une colonne pour l'ensemble des données.

File : dask-example.sh

#!/bin/bash
#SBATCH --account=<your account>
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=6  
#SBATCH --mem=8000M       
#SBATCH --time=0-00:05
#SBATCH --output=%N-%j.out

module load python/3.11
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate

pip install dask distributed pandas --no-index

python dask-example.py


Ce script démarre une grappe Dask ayant autant de processus de travail que de coeurs dans la tâche. Chacun des processus crée au moins un fil d’exécution. Pour déterminer le nombre de processus et de fils, consultez la documentation officielle de Dask. Ici, le dataframe Pandas est divisé en 6 parts et chaque processus en traitera une avec un CPU

File : dask-example.py

import pandas as pd

from dask import dataframe as dd
from dask.distributed import Client, LocalCluster

import os

n_workers = int(os.environ['SLURM_CPUS_PER_TASK'])

cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1)
client = Client(cluster)

index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=n_workers) # split the pandas data frame into "n_workers" chunks

result = ddf.a.mean().compute()

print(f"The mean is {result}")


Nœuds multiples

Nous utilisons l’exemple précédent mais cette fois-ci avec une grappe Dask avec deux nœuds de 6 CPU chacun. Deux processus sont créés sur chaque nœud, chacun avec 3 cœurs.

File : dask-example.sh

#!/bin/bash
#SBATCH --nodes 2
#SBATCH --tasks-per-node=2
#SBATCH --mem=16000M
#SBATCH --cpus-per-task=3
#SBATCH --time=0-00:30
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>

export DASK_SCHEDULER_ADDR=$(hostname)
export DASK_SCHEDULER_PORT=34567

srun -N 2 -n 2 config_virtualenv.sh # set both -N and -n to the number of nodes

source $SLURM_TMPDIR/ENV/bin/activate

dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT &
sleep 10

srun launch_dask_workers.sh &
dask_cluster_pid=$!
sleep 10

python test_dask.py

kill $dask_cluster_pid # shut down Dask workers after the python process exits


où le script config_env.sh est

File : config_env.sh

#!/bin/bash

echo "From node ${SLURM_NODEID}: installing virtualenv..."

module load python/3.11
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate

pip install --no-index dask distributed pandas

echo "Done installing virtualenv!"

deactivate


et le script launch_dask_workers.sh est

File : launch_dask_workers.sh

#!/bin/bash

source $SLURM_TMPDIR/env/bin/activate

SCHEDULER_CONNECTION_STRING="tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT"

if [[ "$SLURM_PROCID" -eq "0" ]]; then
## On the SLURM task with Rank 0, where the Dask scheduler process has already been launched, we launch a smaller worker,
## with 40% of the job's memory and we subtract one core from the task to leave it for the scheduler.
        DASK_WORKER_MEM=0.4
        DASK_WORKER_THREADS=$(($SLURM_CPUS_PER_TASK-1))

else
## On all other SLURM tasks, each worker gets half of the job's allocated memory and all the cores allocated to its task.
        DASK_WORKER_MEM=0.5
        DASK_WORKER_THREADS=$SLURM_CPUS_PER_TASK
fi

dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=1 \
--nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR

sleep 5
echo "dask worker started!"


Enfin, le script test_dask.py est

File : test_dask.py

import pandas as pd

from dask import dataframe as dd
from dask.distributed import Client

import os

client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}")

index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=6)

result = ddf.a.mean().compute()

print(f"The mean is {result}")