Dask: Difference between revisions
(Created page with "[https://docs.dask.org/en/stable/ Dask] is a flexible library for parallel computing in Python. It provides parallelized NumPy array and Pandas DataFrame objects, and it enables distributed computing in pure Python with access to the PyData stack. ==Installing our wheel== <!--T:15--> The preferred option is to install it using our provided Python [https://pythonwheels.com/ wheel] as follows: :1. Load a Python module, thus <...") |
m (→Single Node) |
||
(7 intermediate revisions by 2 users not shown) | |||
Line 1: | Line 1: | ||
[https://docs.dask.org/en/stable/ Dask] is a flexible library for parallel computing in Python. It provides parallelized NumPy array and Pandas DataFrame objects, and it enables distributed computing in pure Python with access to the PyData stack. | [https://docs.dask.org/en/stable/ Dask] is a flexible library for parallel computing in Python. It provides parallelized NumPy array and Pandas DataFrame objects, and it enables distributed computing in pure Python with access to the PyData stack. | ||
=Installing our wheel= | |||
The preferred option is to install it using our provided Python [https://pythonwheels.com/ wheel] as follows: | The preferred option is to install it using our provided Python [https://pythonwheels.com/ wheel] as follows: | ||
:1. Load a Python [[Utiliser_des_modules/en#Sub-command_load|module]], thus <tt>module load python</tt> | :1. Load a Python [[Utiliser_des_modules/en#Sub-command_load|module]], thus <tt>module load python/3.11</tt> | ||
:2. Create and start a [[Python#Creating_and_using_a_virtual_environment|virtual environment]]. | :2. Create and start a [[Python#Creating_and_using_a_virtual_environment|virtual environment]]. | ||
:3. Install <tt>dask</tt>, and optionally <tt>dask-distributed</tt> in the virtual environment with <code>pip install</code>. | :3. Install <tt>dask</tt>, and optionally <tt>dask-distributed</tt> in the virtual environment with <code>pip install</code>. | ||
:{{Command|prompt=(venv) [name@server ~]|pip install --no-index dask distributed }} | :{{Command|prompt=(venv) [name@server ~]|pip install --no-index dask distributed }} | ||
=Job Submission= | |||
== Single Node == | == Single Node == | ||
Line 18: | Line 20: | ||
|contents= | |contents= | ||
#!/bin/bash | #!/bin/bash | ||
#SBATCH --account=<your account> | |||
#SBATCH --ntasks=1 | #SBATCH --ntasks=1 | ||
#SBATCH --cpus-per-task=6 | #SBATCH --cpus-per-task=6 | ||
Line 24: | Line 27: | ||
#SBATCH --output=%N-%j.out | #SBATCH --output=%N-%j.out | ||
module load python | module load python/3.11 | ||
virtualenv --no-download $SLURM_TMPDIR/env | virtualenv --no-download $SLURM_TMPDIR/env | ||
source $SLURM_TMPDIR/env/bin/activate | source $SLURM_TMPDIR/env/bin/activate | ||
Line 44: | Line 47: | ||
from dask.distributed import Client, LocalCluster | from dask.distributed import Client, LocalCluster | ||
n_workers = os.environ['SLURM_CPUS_PER_TASK'] | import os | ||
n_workers = int(os.environ['SLURM_CPUS_PER_TASK']) | |||
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1) | cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1) | ||
Line 73: | Line 78: | ||
#SBATCH --time=0-00:30 | #SBATCH --time=0-00:30 | ||
#SBATCH --output=%N-%j.out | #SBATCH --output=%N-%j.out | ||
#SBATCH --account= | #SBATCH --account=<your account> | ||
export DASK_SCHEDULER_ADDR=$(hostname) | export DASK_SCHEDULER_ADDR=$(hostname) | ||
Line 105: | Line 110: | ||
echo "From node ${SLURM_NODEID}: installing virtualenv..." | echo "From node ${SLURM_NODEID}: installing virtualenv..." | ||
module load python/3. | module load python/3.11 | ||
virtualenv --no-download $SLURM_TMPDIR/env | virtualenv --no-download $SLURM_TMPDIR/env | ||
source $SLURM_TMPDIR/env/bin/activate | source $SLURM_TMPDIR/env/bin/activate | ||
Line 122: | Line 127: | ||
|lang="bash" | |lang="bash" | ||
|contents= | |contents= | ||
#!/bin/bash | |||
source $SLURM_TMPDIR/env/bin/activate | source $SLURM_TMPDIR/env/bin/activate | ||
Line 138: | Line 145: | ||
fi | fi | ||
dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers= \ | dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=1 \ | ||
--nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR | --nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR | ||
Line 162: | Line 169: | ||
index = pd.date_range("2021-09-01", periods=2400, freq="1H") | index = pd.date_range("2021-09-01", periods=2400, freq="1H") | ||
df = pd.DataFrame({"a": np.arange(2400)}, index=index) | df = pd.DataFrame({"a": np.arange(2400)}, index=index) | ||
ddf = dd.from_pandas(df, npartitions= | ddf = dd.from_pandas(df, npartitions=6) | ||
result = ddf.a.mean().compute() | result = ddf.a.mean().compute() |
Latest revision as of 15:21, 18 July 2024
Dask is a flexible library for parallel computing in Python. It provides parallelized NumPy array and Pandas DataFrame objects, and it enables distributed computing in pure Python with access to the PyData stack.
Installing our wheel
The preferred option is to install it using our provided Python wheel as follows:
- 1. Load a Python module, thus module load python/3.11
- 2. Create and start a virtual environment.
- 3. Install dask, and optionally dask-distributed in the virtual environment with
pip install
.
-
(venv) [name@server ~] pip install --no-index dask distributed
Job Submission
Single Node
Below is an example of a job that spawns a single-node Dask cluster with 6 cpus and computes the mean of a column of a parallelized dataframe.
#!/bin/bash
#SBATCH --account=<your account>
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=6
#SBATCH --mem=8000M
#SBATCH --time=0-00:05
#SBATCH --output=%N-%j.out
module load python/3.11
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install dask distributed pandas --no-index
python dask-example.py
In the script Dask-example.py, we launch a Dask cluster with as many worker processes as there are cores in our job. This means each worker will spawn at most one CPU thread. For a complete discussion of how to reason about the number worker processes and the number of threads per worker, see the official Dask documentation. In this example, we split a pandas data frame into 6 chunks, so each worker will process a part of the data frame using one CPU:
import pandas as pd
from dask import dataframe as dd
from dask.distributed import Client, LocalCluster
import os
n_workers = int(os.environ['SLURM_CPUS_PER_TASK'])
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1)
client = Client(cluster)
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=n_workers) # split the pandas data frame into "n_workers" chunks
result = ddf.a.mean().compute()
print(f"The mean is {result}")
Multiple Nodes
In the example that follows, we reproduce the single-node example, but this time with a two-node Dask cluster, with 6 CPUs on each node. This time we also spawn 2 workers per node, each with 3 cores.
#!/bin/bash
#SBATCH --nodes 2
#SBATCH --tasks-per-node=2
#SBATCH --mem=16000M
#SBATCH --cpus-per-task=3
#SBATCH --time=0-00:30
#SBATCH --output=%N-%j.out
#SBATCH --account=<your account>
export DASK_SCHEDULER_ADDR=$(hostname)
export DASK_SCHEDULER_PORT=34567
srun -N 2 -n 2 config_virtualenv.sh # set both -N and -n to the number of nodes
source $SLURM_TMPDIR/ENV/bin/activate
dask scheduler --host $DASK_SCHEDULER_ADDR --port $DASK_SCHEDULER_PORT &
sleep 10
srun launch_dask_workers.sh &
dask_cluster_pid=$!
sleep 10
python test_dask.py
kill $dask_cluster_pid # shut down Dask workers after the python process exits
Where the script config_env.sh
is:
#!/bin/bash
echo "From node ${SLURM_NODEID}: installing virtualenv..."
module load python/3.11
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install --no-index dask distributed pandas
echo "Done installing virtualenv!"
deactivate
And the script launch_dask_workers.sh
is:
#!/bin/bash
source $SLURM_TMPDIR/env/bin/activate
SCHEDULER_CONNECTION_STRING="tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT"
if [[ "$SLURM_PROCID" -eq "0" ]]; then
## On the SLURM task with Rank 0, where the Dask scheduler process has already been launched, we launch a smaller worker,
## with 40% of the job's memory and we subtract one core from the task to leave it for the scheduler.
DASK_WORKER_MEM=0.4
DASK_WORKER_THREADS=$(($SLURM_CPUS_PER_TASK-1))
else
## On all other SLURM tasks, each worker gets half of the job's allocated memory and all the cores allocated to its task.
DASK_WORKER_MEM=0.5
DASK_WORKER_THREADS=$SLURM_CPUS_PER_TASK
fi
dask worker "tcp://$DASK_SCHEDULER_ADDR:$DASK_SCHEDULER_PORT" --no-dashboard --nworkers=1 \
--nthreads=$DASK_WORKER_THREADS --memory-limit=$DASK_WORKER_MEM --local-directory=$SLURM_TMPDIR
sleep 5
echo "dask worker started!"
And, finally, the script test_dask.py
is:
import pandas as pd
from dask import dataframe as dd
from dask.distributed import Client
import os
client = Client(f"tcp://{os.environ['DASK_SCHEDULER_ADDR']}:{os.environ['DASK_SCHEDULER_PORT']}")
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400)}, index=index)
ddf = dd.from_pandas(df, npartitions=6)
result = ddf.a.mean().compute()
print(f"The mean is {result}")