Run a job on Delta
This is a tutorial on how to run jobs on Delta, primarily GPU intensive tasks.
Choosing your client
In this tutorial we will be using the Dask Python library, which has some support for running jobs locally, on an HPC cluster like Delta, or remotely via SSH. For the purposes of this tutorial, we will be running our jobs from a login node on Delta.
Login to Delta
SSH, connect via VS Code, or start a new web VS Code session on Delta. Documentation for loggon methods can be found here .
Both compute and login nodes can be used to enqueue jobs. A good strategy for prototyping GPU workloads is use a local Dask cluster for testing on a compute node, then use a login node to run that code on the cluster.
Cluster Setup
Below is an example of a cluster configuration for Delta. If you have an Nvidia GPU, you can use a LocalCudaCluster here
instead and it will function the same. You can find an example of one here .
# delta.py
"""
Everything before this is "server" code - meanining it only runs on the node you run this script from
"""
from dask_jobqueue import SLURMCluster
from distributed import Client, as_completed
import pandas as pd
cluster = SLURMCluster(
queue="gpuA40x4-interactive", # use gpuA40x4 for production runs, as this costs twice as much
account="bddu-delta-gpu",
cores=1,
memory="16GB",
processes=1,
name="privacy-pipeline",
walltime="00:30:00",
job_extra_directives=[
"--gpus-per-node=1",
],
python=VENV_PATH,
worker_command="distributed.cli.dask_worker",
worker_extra_args=[
"--preload", "dask_cuda.initialize", # enables Dask-CUDA setup on the worker
"--resources", "GPU=1", # so we can schedule GPU-tagged tasks
],
)
# This outputs the
print(cluster.job_script())
print("Scaling cluster...")
cluster.scale(jobs=NUM_WORKERS)
cluster.wait_for_workers(NUM_WORKERS)
client = Client(cluster)
print("Uploading dependencies...")
# IMPORTANT: client code does not ship by default with Dask, and must be uploaded like so to each worker node
client.upload_file("src/pipeline/worker.py", load=True)Be sure to check which job queue you are submitting to. Interactive jobs can cost up to 6x more SU while standard and preempt jobs can have wait times of up to an hour. A full list of Delta job queues and their pricing can be found here .
Let’s unpack this just a bit. cores corresponds to the number of virtual CPU cores that need to run on the server, alongside memory for RAM and
processes for the number of threads. walltime is the total amount of time any client will run for - e.g., if your job sits in the queue for an hour,
that is excluded.
The total amount charged to our account is equal to , where and correspond to the walltime and the charge factor. Dask Jobqueue supports both dynamic and statically sized clusters, hence why this only generalizes to if you only use ever use number of nodes for the entire duration of your job.
In general, it’s best to assign one pipeline per GPU and max out its VRAM usage to avoid the overhead and use fixed size clusters for predictability. Walltime actually refers to the amount of time the GPU is in use for GPU nodes, meaning idle nodes aren’t going to contribute to the total walltime. While it may seem that one worker can do roughly the same amount of work in 8 hours as 8 workers in 1 hour, there are many effeciency benefits that are baked in by Dask’s scheduler.
Assigning two GPUs to one and dividing client code (eg inference and hardware-accelerated video editing) among them may seem intuitive, but this negates the benefit of not being charged for walltime when a GPU is not in use. Since the hardware is virtualized, there are no guarantees that requesting two GPUs per node will result in physical colocation, or a lower networking overhead. Lastly, host (RAM) and device (VRAM) memory is notoriously difficult to work with across multiple devices. Dask actually cannot serialize Python objects that are held in device memory.
”Client” / Worker Code Setup
This is apart of the same script, and since we want all of the workers to import this pipeline code, we import it AFTER it’s been uploaded to the clients.
# delta.py
from worker import process_partition
import csv
print("Paritioning videos...")
partitions = [[] for _ in range(NUM_WORKERS)]
for i,video in enumerate(videos):
partitions[i % NUM_WORKERS].append(video)
print("Submitting tasks...")
futures = []
for partition in partitions:
futures.append(client.submit(process_partition, MODELS, partition, resources={'GPU': 1}))
print("Collecting results...")
client.gather(futures)In Dask, futures are values that need to be computed in parallel (concurrent when running on one machine). Simular to an OS, Dask will schedule tasks based on the resources requested and what’s available. You can read more about futures here .
Run Your Job
Simply run python delta.py or whatever the name of your job script is, and you should see…nothing. The sample script includes a few print statements
to aid in debugging, but generally Dask will only output slurm.out for each worker node that might contain any errors your job encountered. If you
are running via VS Code locally, you can access the Dask Dashboard to monitor task execution and node resource usage at http://localhost:8787/status . Unfortunately
the dashboard is served via a compute node, and we have no access to its ports to enable a remote job to be monitored as such.
Reference
- Example Dask SLURM deployments
- Dask Jobqueue for Jupyter Notebooks
- dask-cuda docs - LocalCudaCluster, networking, etc
- 10 Minutes to Dask - dask tutorial