Ray

From Alliance Doc
Jump to navigation Jump to search


Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a toolkit of libraries for simplifying running parallel/distributed workloads, in particular Machine Learning jobs.

Installation

Latest available wheels

To see the latest version of Ray that we have built:

Question.png
[name@server ~]$ avail_wheels "ray"

For more information, see Available wheels.

Installing our wheel

The preferred option is to install it using the Python wheel as follows:

1. Load a Python module, thus module load python
2. Create and start a virtual environment.
3. Install Ray in the virtual environment with pip install.
Question.png
(venv) [name@server ~] pip install --no-index ray

Job submission

Single Node

Below is an example of a job that spawns a single-node Ray cluster with 6 cpus and 1 GPU.


File : ray-example.sh

#!/bin/bash
#SBATCH --ntasks=1
#SBATCH --gpus-per-task=1
#SBATCH --cpus-per-task=6  
#SBATCH --mem=32000M       
#SBATCH --time=0-00:05
#SBATCH --output=%N-%j.out

export HEAD_NODE=$(hostname) # store head node's address
export RAY_PORT=34567 # choose a port to start Ray on the head node 

module load python gcc/9.3.0 arrow
virtualenv --no-download $SLURM_TMPDIR/env
source $SLURM_TMPDIR/env/bin/activate

pip install ray --no-index

# Launch single-node ray cluster as a background process: only node is the head node! 
ray start --head --node-ip-address=$HEAD_NODE --port=$RAY_PORT --num-cpus=$SLURM_CPUS_PER_TASK --num-gpus=1 --block &
sleep 10

python ray-example.py


In this simple example, we connect to the single-node Ray cluster launched in the job submission script, then we check that Ray sees the resources allocated to the job.


File : ray-example.py

import ray
import os

# Connect to Ray cluster
ray.init(address=f"{os.environ['HEAD_NODE']}:{os.environ['RAY_PORT']}",_node_ip_address=os.environ['HEAD_NODE'])

# Check that ray can see 6 cpus and 1 GPU
print(ray.available_resources())


Multiple Nodes

In the example that follows, we submit a job that spawns a two-node Ray cluster with 6 cpus and 1 GPU per node.


File : ray-example.sh

#!/bin/bash
#SBATCH --nodes 2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-task=1
#SBATCH --cpus-per-task=6  
#SBATCH --mem=32000M       
#SBATCH --time=0-00:10
#SBATCH --output=%N-%j.out

## Create a virtualenv and install Ray on all nodes ##
srun -N $SLURM_NNODES -n $SLURM_NNODES config_env.sh

export HEAD_NODE=$(hostname) # store head node's address
export RAY_PORT=34567 # choose a port to start Ray on the head node 

source $SLURM_TMPDIR/ENV/bin/activate

## Start Ray cluster Head Node ##
ray start --head --node-ip-address=$HEAD_NODE --port=$RAY_PORT --num-cpus=$SLURM_CPUS_PER_TASK --num-gpus=1 --block &
sleep 10

## Launch worker nodes on all the other nodes allocated by the job ##
srun launch_ray.sh &
ray_cluster_pid=$!

python test_ray.py

## Shut down Ray worker nodes after the Python script exits ##
kill $ray_cluster_pid


Where the script config_env.sh is:


File : config_env.sh

#!/bin/bash

module load python

virtualenv --no-download $SLURM_TMPDIR/ENV

source $SLURM_TMPDIR/ENV/bin/activate

pip install --upgrade pip --no-index

pip install ray --no-index


And the script launch_ray.sh is:


File : launch_ray.sh

#!/bin/bash

source $SLURM_TMPDIR/ENV/bin/activate
module load gcc/9.3.0 arrow

if [[ "$SLURM_PROCID" -eq "0" ]]; then
        echo "Ray head node already started..."
        sleep 10

else
        ray start --address "${HEAD_NODE}:${RAY_PORT}" --num-cpus="${SLURM_CPUS_PER_TASK}" --num-gpus=1 --block
        sleep 5
        echo "ray worker started!"
fi


In this simple example, we connect to the two-node Ray cluster launched in the job submission script, then we check that Ray sees the resources allocated to the job.


File : test_ray.py

import ray
import os

# Connect to Ray cluster
ray.init(address=f"{os.environ['HEAD_NODE']}:{os.environ['RAY_PORT']}",_node_ip_address=os.environ['HEAD_NODE'])

# Check that Ray sees two nodes and their status is 'Alive'
print("Nodes in the Ray cluster:")
print(ray.nodes())

# Check that Ray sees 12 CPUs and 2 GPUs over 2 Nodes
print(ray.available_resources())