Ray
Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a toolkit of libraries for simplifying running parallel/distributed workloads, in particular Machine Learning jobs.
Installation
Latest available wheels
To see the latest version of Ray that we have built:
[name@server ~]$ avail_wheels "ray"
For more information, see Available wheels.
Installing our wheel
The preferred option is to install it using the Python wheel as follows:
- 1. Load a Python module, thus module load python
- 2. Create and start a virtual environment.
- 3. Install Ray in the virtual environment with
pip install
.
-
(venv) [name@server ~] pip install --no-index ray
Job submission
Single Node
Below is an example of a job that spawns a single-node Ray cluster with 6 cpus and 1 GPU.
#!/bin/bash
#SBATCH --ntasks=1
#SBATCH --gpus-per-task=1
#SBATCH --cpus-per-task=6
#SBATCH --mem=32000M
#SBATCH --time=0-00:05
#SBATCH --output=%N-%j.out
export HEAD_NODE=$(hostname) # store head node's address
export RAY_PORT=34567 # choose a port to start Ray on the head node
module load python gcc/9.3.0 arrow
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate
pip install ray --no-index
# Launch single-node ray cluster as a background process: only node is the head node!
ray start --head --node-ip-address=$HEAD_NODE --port=$RAY_PORT --num-cpus=$SLURM_CPUS_PER_TASK --num-gpus=1 --block &
sleep 10
python ray-example.py
In this simple example, we connect to the single-node Ray cluster launched in the job submission script, then we check that Ray sees the resources allocated to the job.
import ray
import os
# Connect to Ray cluster
ray.init(address=f"{os.environ['HEAD_NODE']}:{os.environ['RAY_PORT']}",_node_ip_address=os.environ['HEAD_NODE'])
# Check that ray can see 6 cpus and 1 GPU
print(ray.available_resources())
Multiple Nodes
In the example that follows, we submit a job that spawns a two-node Ray cluster with 6 cpus and 1 GPU per node.
#!/bin/bash
#SBATCH --nodes 2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-task=1
#SBATCH --cpus-per-task=6
#SBATCH --mem=32000M
#SBATCH --time=0-00:10
#SBATCH --output=%N-%j.out
## Create a virtualenv and install Ray on all nodes ##
srun -N $SLURM_NNODES -n $SLURM_NNODES config_env.sh
export HEAD_NODE=$(hostname) # store head node's address
export RAY_PORT=34567 # choose a port to start Ray on the head node
source $SLURM_TMPDIR/ENV/bin/activate
## Start Ray cluster Head Node ##
ray start --head --node-ip-address=$HEAD_NODE --port=$RAY_PORT --num-cpus=$SLURM_CPUS_PER_TASK --num-gpus=1 --block &
sleep 10
## Launch worker nodes on all the other nodes allocated by the job ##
srun launch_ray.sh &
ray_cluster_pid=$!
python test_ray.py
## Shut down Ray worker nodes after the Pythons script exits ##
kill $ray_cluster_pid
Where the script config_env.sh
is:
#!/bin/bash
module load python
virtualenv --no-download $SLURM_TMPDIR/ENV
source $SLURM_TMPDIR/ENV/bin/activate
pip install --upgrade pip --no-index
pip install ray --no-index
And the script launch_ray.sh
is:
#!/bin/bash
source $SLURM_TMPDIR/ENV/bin/activate
module load gcc/9.3.0 arrow
if [[ "$SLURM_PROCID" -eq "0" ]]; then
echo "Ray head node already started..."
sleep 10
else
ray start --address "${HEAD_NODE}:${RAY_PORT}" --num-cpus="${SLURM_CPUS_PER_TASK}" --num-gpus=1 --block
sleep 5
echo "ray worker started!"
fi
In this simple example, we connect to the two-node Ray cluster launched in the job submission script, then we check that Ray sees the resources allocated to the job.
import ray
import os
# Connect to Ray cluster
ray.init(address=f"{os.environ['HEAD_NODE']}:{os.environ['RAY_PORT']}",_node_ip_address=os.environ['HEAD_NODE'])
# Check that Ray sees two nodes and their status is 'Alive'
print("Nodes in the Ray cluster:")
print(ray.nodes())
# Check that Ray sees 12 CPUs and 2 GPUs over 2 Nodes
print(ray.available_resources())