This guide is for batch users that have a basic understanding of interacting with Kubernetes from Python. For more information, see Kueue’s overview.
Before you begin
Check administer cluster quotas for details on the initial cluster setup.
You’ll also need kubernetes python installed. We recommend a virtual environment.
Note that the following versions were used for developing these examples:
Python: 3.9.12
kubernetes: 26.1.0
requests: 2.31.0
You can either follow the install instructions for Kueue, or use the install example, below.
Kueue in Python
Kueue at the core is a controller for a Custom Resource, and so to interact with it from Python we don’t need a custom SDK, but rather we can use the generic functions provided by the
Kubernetes Python library. In this guide, we provide several examples
for interacting with Kueue in this fashion. If you would like to request a new example or would like help for a specific use
case, please open an issue.
Examples
The following examples demonstrate different use cases for using Kueue in Python.
Install Kueue
This example demonstrates installing Kueue to an existing cluster. You can save this
script to your local machine as install-kueue-queues.py.
#!/usr/bin/env python3from kubernetes import utils, config, client
import tempfile
import requests
import argparse
# install-kueue-queues.py will:# 1. install queue from the latest or a specific version on GitHub# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
defget_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,)
parser.add_argument("--version",help="Version of Kueue to install (if undefined, will install from master branch)",
default=None,)return parser
defmain():"""
Install Kueue and the Queue components.
This will error if they are already installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
install_kueue(args.version)defget_install_url(version):"""
Get the install version.
If a version is specified, use it. Otherwise install
from the main branch.
"""if version isnotNone:returnf"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"return"https://github.com/kubernetes-sigs/kueue/config/default?ref=main"definstall_kueue(version):"""
Install Kueue of a particular version.
"""print("⭐️ Installing Kueue...")
url = get_install_url(version)with tempfile.NamedTemporaryFile(delete=True)as install_yaml:
res = requests.get(url)assert res.status_code ==200
install_yaml.write(res.content)
utils.create_from_yaml(api_client, install_yaml.name)if __name__ =="__main__":
main()
And then run as follows:
python install-kueue-queues.py
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...
You can also target a specific version:
python install-kueue-queues.py --version v0.9.1
Sample Job
For the next example, let’s start with a cluster with Kueue installed, and first create our queues:
#!/usr/bin/env python3import argparse
from kubernetes import config, client
# create_job.py# This example will demonstrate full steps to submit a Job.# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
defget_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,)
parser.add_argument("--job-name",help="generateName field to set for job",
default="sample-job-",)
parser.add_argument("--image",help="container image to use",
default="gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",)
parser.add_argument("--args",
nargs="+",help="args for container",
default=["30s"],)return parser
defgenerate_job_crd(job_name, image, args):"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
generate_name=job_name, labels={"kueue.x-k8s.io/queue-name":"user-queue"})# Job container
container = client.V1Container(
image=image,
name="dummy-job",
args=args,
resources={"requests":{"cpu":1,"memory":"200Mi",}},)# Job template
template ={"spec":{"containers":[container],"restartPolicy":"Never"}}return client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(
parallelism=1, completions=3, suspend=True, template=template
),)defmain():"""
Run a job.
"""
parser = get_parser()
args, _ = parser.parse_known_args()# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.args)
batch_api = client.BatchV1Api()print(f"📦️ Container image selected is {args.image}...")print(f"⭐️ Creating sample job with prefix {args.job_name}...")
batch_api.create_namespaced_job("default", crd)print('Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs')if __name__ =="__main__":
main()
And run as follows:
python sample-job.py
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
or try changing the name (generateName) of the job:
python sample-job.py --job-name sleep-job-
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
You can also change the container image with --image and args with --args.
For more customization, you can edit the example script.
Interact with Queues and Jobs
If you are developing an application that submits jobs and needs to interact
with and check on them, you likely want to interact with queues or jobs directly.
After running the example above, you can test the following example to interact
with the results. Write the following to a script called sample-queue-control.py.
#!/usr/bin/env python3import argparse
from kubernetes import config, client
# sample-queue-control.py# This will show how to interact with queues# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
defget_parser():
parser = argparse.ArgumentParser(
description="Interact with Queues e",
formatter_class=argparse.RawTextHelpFormatter,)
parser.add_argument("--namespace",help="namespace to list for",
default="default",)return parser
defmain():"""
Get a listing of jobs in the queue
"""
parser = get_parser()
args, _ = parser.parse_known_args()
listing = crd_api.list_namespaced_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=args.namespace,
plural="localqueues",)
list_queues(listing)
listing = crd_api.list_namespaced_custom_object(
group="batch",
version="v1",
namespace=args.namespace,
plural="jobs",)
list_jobs(listing)deflist_jobs(listing):"""
Iterate and show job metadata.
"""ifnot listing:print("💼️ There are no jobs.")returnprint("\n💼️ Jobs")for job in listing["items"]:
jobname = job["metadata"]["name"]
status =("TBA"if"succeeded"notin job["status"]else job["status"]["succeeded"])
ready = job["status"]["ready"]print(f"Found job {jobname}")print(f" Succeeded: {status}")print(f" Ready: {ready}")deflist_queues(listing):"""
Helper function to iterate over and list queues.
"""ifnot listing:print("⛑️ There are no queues.")returnprint("\n⛑️ Local Queues")# This is listing queuesfor q in listing["items"]:print(f'Found queue {q["metadata"]["name"]}')print(f" Admitted workloads: {q['status']['admittedWorkloads']}")print(f" Pending workloads: {q['status']['pendingWorkloads']}")# And flavors with resourcesfor f in q["status"]["flavorUsage"]:print(f' Flavor {f["name"]} has resources {f["resources"]}')if __name__ =="__main__":
main()
To make the output more interesting, we can run a few random jobs first:
And then run the script to see your queue and sample job that you submit previously.
python sample-queue-control.py
⛑️ Local Queues
Found queue user-queue
Admitted workloads: 3
Pending workloads: 0
Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]
💼️ Jobs
Found job sample-job-8n5sb
Succeeded: 3
Ready: 0
Found job sample-job-gnxtl
Succeeded: 1
Ready: 0
Found job tacos46bqw
Succeeded: 1
Ready: 1
If you wanted to filter jobs to a specific queue, you can do this via the job labels
under `job[“metadata”][“labels”][“kueue.x-k8s.io/queue-name”]’. To list a specific job by
name, you can do:
from kubernetes import client, config
# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw","default")print(job)
For this example, we will be using the Flux Operator
to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:
Write the following script to sample-flux-operator-job.py:
#!/usr/bin/env python3import argparse
from kubernetes import config, client
import fluxoperator.models as models
# sample-flux-operator.py# This example will demonstrate full steps to submit a Job via the Flux Operator.# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
defget_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,)
parser.add_argument("--job-name",help="generateName field to set for job (job prefix does not work here)",
default="hello-world",)
parser.add_argument("--image",help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",)
parser.add_argument("--tasks",help="Number of tasks",
default=1,type=int,)
parser.add_argument("--quiet",help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,)
parser.add_argument("--command",help="command to run",
default="echo",)
parser.add_argument("--args", nargs="+",help="args for container", default=["hello","world"])return parser
defgenerate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command +" "+" ".join(args),
resources={"limits":{"cpu":1,"memory":"2Gi",}},)# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name":"user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},)return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",),
spec=spec,)defmain():"""
Run an example job using the Flux Operator.
"""
parser = get_parser()
args, _ = parser.parse_known_args()# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()print(f"📦️ Container image selected is {args.image}...")print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,)print('Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods')if __name__ =="__main__":
main()
Now try running the example:
python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods
You’ll be able to almost immediately see the MiniCluster job admitted to the local queue:
kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
And the 4 pods running (we are creating a networked cluster with 4 nodes):
kubectl get pods
NAME READY STATUS RESTARTS AGE
hello-world7qgqd-0-wp596 1/1 Running 0 7s
hello-world7qgqd-1-d7r87 1/1 Running 0 7s
hello-world7qgqd-2-rfn4t 1/1 Running 0 7s
hello-world7qgqd-3-blvtn 1/1 Running 0 7s
If you look at logs of the main broker pod (index 0 of the job above), there is a lot of
output for debugging, and you can see “hello world” running at the end:
If you submit and ask for four tasks, you’ll see “hello world” four times:
python sample-flux-operator-job.py --tasks4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world
You can further customize the job, and can ask questions on the Flux Operator issues board.
Finally, for instructions for how to do this with YAML outside of Python, see Run A Flux MiniCluster.
MPI Operator Job
For this example, we will be using the MPI Operator
to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:
Importantly, the MPI Operator must be installed before Kueue for this to work! Let’s start from scratch with a new Kind cluster.
We will also need to install the MPI operator and Kueue. Here we install
the exact versions tested with this example:
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
#!/usr/bin/env python3import argparse
from kubernetes import config, client
import mpijob.models as models
# sample-mpijob.py# This example will demonstrate full steps to submit a Job via the MPI Operator# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
defget_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue MPI Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,)
parser.add_argument("--job-name",help="generateName field to set for job (job prefix does not work here)",
default="pi",)
parser.add_argument("--image",help="container image to use",
default="mpioperator/mpi-pi:openmpi",)
parser.add_argument("--command",help="command to run",
default="mpirun",)
parser.add_argument("--args",
nargs="+",help="args for container",
default=["-n","2","/home/mpiuser/pi"],)return parser
defgenerate_job_crd(job_name, image, command, args):"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
name=job_name, labels={"kueue.x-k8s.io/queue-name":"user-queue"})# containers for launcher and worker
launcher_container = client.V1Container(
image=image,
name="mpi-launcher",
command=[command],
args=args,
security_context=client.V1SecurityContext(run_as_user=1000),
resources={"limits":{"cpu":1,"memory":"1Gi",}},)
worker_container = client.V1Container(
image=image,
name="mpi-worker",
command=["/usr/sbin/sshd"],
args=["-De","-f","/home/mpiuser/.sshd_config"],
security_context=client.V1SecurityContext(run_as_user=1000),
resources={"limits":{"cpu":1,"memory":"1Gi",}},)# Create the Launcher and worker replica specs
launcher = models.V2beta1ReplicaSpec(
replicas=1,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[launcher_container])),)
worker = models.V2beta1ReplicaSpec(
replicas=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[worker_container])),)# runPolicy for jobspec
policy = models.V2beta1RunPolicy(
clean_pod_policy="Running", ttl_seconds_after_finished=60)# Create the jobspec
jobspec = models.V2beta1MPIJobSpec(
slots_per_worker=1,
run_policy=policy,
ssh_auth_mount_path="/home/mpiuser/.ssh",
mpi_replica_specs={"Launcher": launcher,"Worker": worker},)return models.V2beta1MPIJob(
metadata=metadata,
api_version="kubeflow.org/v2beta1",
kind="MPIJob",
spec=jobspec,)defmain():"""
Run an MPIJob. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
crd_api = client.CustomObjectsApi()print(f"📦️ Container image selected is {args.image}...")print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace="default",
plural="mpijobs",
body=crd,)print('Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs')if __name__ =="__main__":
main()
After submit, you can see that the queue has an admitted workload!
$ kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
And that the job “pi-launcher” has started:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
pi-launcher 0/1 9s 9s
The MPI Operator works by way of a central launcher interacting with nodes via ssh. We can inspect
a worker and the launcher to get a glimpse of how both work:
$ kubectl logs pods/pi-worker-1
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.
The job is fairly quick, and we can see the output of pi in the launcher:
$ kubectl logs pods/pi-launcher-f4gqv
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002
That looks like pi! 🎉️🥧️
If you are interested in running this same example with YAML outside of Python, see Run an MPIJob.