Skip to Content
TutorialsRun a job on Delta

Run a job on Delta

This is a tutorial on how to run jobs on Delta, primarily GPU intensive tasks.

Choosing your client

In this tutorial we will be using the Dask Python library, which has some support for running jobs locally, on an HPC cluster like Delta, or remotely via SSH. For the purposes of this tutorial, we will be running our jobs from a login node on Delta.

Login to Delta

SSH, connect via VS Code, or start a new web VS Code session on Delta. Documentation for loggon methods can be found here .

Tip

Both compute and login nodes can be used to enqueue jobs. A good strategy for prototyping GPU workloads is use a local Dask cluster for testing on a compute node, then use a login node to run that code on the cluster.

Cluster Setup

Below is an example of a cluster configuration for Delta. If you have an Nvidia GPU, you can use a LocalCudaCluster here instead and it will function the same. You can find an example of one here .

# delta.py """ Everything before this is "server" code - meanining it only runs on the node you run this script from """ from dask_jobqueue import SLURMCluster from distributed import Client, as_completed import pandas as pd cluster = SLURMCluster( queue="gpuA40x4-interactive", # use gpuA40x4 for production runs, as this costs twice as much account="bddu-delta-gpu", cores=1, memory="16GB", processes=1, name="privacy-pipeline", walltime="00:30:00", job_extra_directives=[ "--gpus-per-node=1", ], python=VENV_PATH, worker_command="distributed.cli.dask_worker", worker_extra_args=[ "--preload", "dask_cuda.initialize", # enables Dask-CUDA setup on the worker "--resources", "GPU=1", # so we can schedule GPU-tagged tasks ], ) # This outputs the print(cluster.job_script()) print("Scaling cluster...") cluster.scale(jobs=NUM_WORKERS) cluster.wait_for_workers(NUM_WORKERS) client = Client(cluster) print("Uploading dependencies...") # IMPORTANT: client code does not ship by default with Dask, and must be uploaded like so to each worker node client.upload_file("src/pipeline/worker.py", load=True)
Warning

Be sure to check which job queue you are submitting to. Interactive jobs can cost up to 6x more SU while standard and preempt jobs can have wait times of up to an hour. A full list of Delta job queues and their pricing can be found here .

Let’s unpack this just a bit. cores corresponds to the number of virtual CPU cores that need to run on the server, alongside memory for RAM and processes for the number of threads. walltime is the total amount of time any client will run for - e.g., if your job sits in the queue for an hour, that is excluded.

The total amount charged to our account is equal to iwic\sum_{i}w_i*c, where wiw_i and cc correspond to the walltime and the charge factor. Dask Jobqueue supports both dynamic and statically sized clusters, hence why this only generalizes to ncn*c if you only use ever use nn number of nodes for the entire duration of your job.

In general, it’s best to assign one pipeline per GPU and max out its VRAM usage to avoid the overhead and use fixed size clusters for predictability. Walltime actually refers to the amount of time the GPU is in use for GPU nodes, meaning idle nodes aren’t going to contribute to the total walltime. While it may seem that one worker can do roughly the same amount of work in 8 hours as 8 workers in 1 hour, there are many effeciency benefits that are baked in by Dask’s scheduler.

Assigning two GPUs to one and dividing client code (eg inference and hardware-accelerated video editing) among them may seem intuitive, but this negates the benefit of not being charged for walltime when a GPU is not in use. Since the hardware is virtualized, there are no guarantees that requesting two GPUs per node will result in physical colocation, or a lower networking overhead. Lastly, host (RAM) and device (VRAM) memory is notoriously difficult to work with across multiple devices. Dask actually cannot serialize Python objects that are held in device memory.

”Client” / Worker Code Setup

This is apart of the same script, and since we want all of the workers to import this pipeline code, we import it AFTER it’s been uploaded to the clients.

# delta.py from worker import process_partition import csv print("Paritioning videos...") partitions = [[] for _ in range(NUM_WORKERS)] for i,video in enumerate(videos): partitions[i % NUM_WORKERS].append(video) print("Submitting tasks...") futures = [] for partition in partitions: futures.append(client.submit(process_partition, MODELS, partition, resources={'GPU': 1})) print("Collecting results...") client.gather(futures)

In Dask, futures are values that need to be computed in parallel (concurrent when running on one machine). Simular to an OS, Dask will schedule tasks based on the resources requested and what’s available. You can read more about futures here .

Run Your Job

Simply run python delta.py or whatever the name of your job script is, and you should see…nothing. The sample script includes a few print statements to aid in debugging, but generally Dask will only output slurm.out for each worker node that might contain any errors your job encountered. If you are running via VS Code locally, you can access the Dask Dashboard to monitor task execution and node resource usage at http://localhost:8787/status . Unfortunately the dashboard is served via a compute node, and we have no access to its ports to enable a remote job to be monitored as such.

Reference

Last updated on