Skip to content

Commit 3d6fc6a

Browse files
committed
RHOAIENG-39073: Add priority class support
1 parent d202cd4 commit 3d6fc6a

File tree

2 files changed

+204
-22
lines changed

2 files changed

+204
-22
lines changed

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ class RayJob:
5656
5757
This class provides a simplified interface for submitting and managing
5858
RayJob CRs (using the KubeRay RayJob python client).
59+
60+
Kueue Integration:
61+
This class supports Kueue queue management and priority-based preemption:
62+
- Queue selection via 'local_queue' parameter routes jobs to specific LocalQueues
63+
- Priority class via 'priority_class' parameter enables preemption control
64+
- Kueue labels work with both new clusters (via cluster_config) and existing clusters
65+
- When Kueue labels are present, jobs start suspended and Kueue manages admission
5966
"""
6067

6168
def __init__(
@@ -69,6 +76,7 @@ def __init__(
6976
ttl_seconds_after_finished: int = 0,
7077
active_deadline_seconds: Optional[int] = None,
7178
local_queue: Optional[str] = None,
79+
priority_class: Optional[str] = None,
7280
):
7381
"""
7482
Initialize a RayJob instance.
@@ -86,11 +94,16 @@ def __init__(
8694
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
8795
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
8896
local_queue: The Kueue LocalQueue to submit the job to (optional)
97+
priority_class: The Kueue WorkloadPriorityClass name for preemption control (optional).
98+
When specified, adds the 'kueue.x-k8s.io/priority-class' label to enable
99+
priority-based preemption. Requires cluster admin to create WorkloadPriorityClass
100+
resources. If invalid, Kueue will reject the job.
89101
90102
Note:
91103
- True if cluster_config is provided (new cluster will be cleaned up)
92104
- False if cluster_name is provided (existing cluster will not be shut down)
93105
- User can explicitly set this value to override auto-detection
106+
- Kueue labels (queue and priority) can be applied to both new and existing clusters
94107
"""
95108
if cluster_name is None and cluster_config is None:
96109
raise ValueError(
@@ -124,6 +137,7 @@ def __init__(
124137
self.ttl_seconds_after_finished = ttl_seconds_after_finished
125138
self.active_deadline_seconds = active_deadline_seconds
126139
self.local_queue = local_queue
140+
self.priority_class = priority_class
127141

128142
if namespace is None:
129143
detected_namespace = get_current_namespace()
@@ -243,26 +257,35 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
243257
# Extract files once and use for both runtime_env and submitter pod
244258
files = extract_all_local_files(self)
245259

260+
# Build Kueue labels and annotations for all jobs (new and existing clusters)
246261
labels = {}
247-
# If cluster_config is provided, use the local_queue from the cluster_config
248-
if self._cluster_config is not None:
249-
if self.local_queue:
250-
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
262+
263+
# Queue name label - apply to all jobs when explicitly specified
264+
# For new clusters, also auto-detect default queue if not specified
265+
if self.local_queue:
266+
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
267+
elif self._cluster_config is not None:
268+
# Only auto-detect default queue for new clusters
269+
default_queue = get_default_kueue_name(self.namespace)
270+
if default_queue:
271+
labels["kueue.x-k8s.io/queue-name"] = default_queue
251272
else:
252-
default_queue = get_default_kueue_name(self.namespace)
253-
if default_queue:
254-
labels["kueue.x-k8s.io/queue-name"] = default_queue
255-
else:
256-
# No default queue found, use "default" as fallback
257-
labels["kueue.x-k8s.io/queue-name"] = "default"
258-
logger.warning(
259-
f"No default Kueue LocalQueue found in namespace '{self.namespace}'. "
260-
f"Using 'default' as the queue name. If a LocalQueue named 'default' "
261-
f"does not exist, the RayJob submission will fail. "
262-
f"To fix this, please explicitly specify the 'local_queue' parameter."
263-
)
273+
# No default queue found, use "default" as fallback
274+
labels["kueue.x-k8s.io/queue-name"] = "default"
275+
logger.warning(
276+
f"No default Kueue LocalQueue found in namespace '{self.namespace}'. "
277+
f"Using 'default' as the queue name. If a LocalQueue named 'default' "
278+
f"does not exist, the RayJob submission will fail. "
279+
f"To fix this, please explicitly specify the 'local_queue' parameter."
280+
)
281+
282+
# Priority class label - apply when specified
283+
if self.priority_class:
284+
labels["kueue.x-k8s.io/priority-class"] = self.priority_class
264285

265-
rayjob_cr["metadata"]["labels"] = labels
286+
# Apply labels to metadata
287+
if labels:
288+
rayjob_cr["metadata"]["labels"] = labels
266289

267290
# When using Kueue (queue label present), start with suspend=true
268291
# Kueue will unsuspend the job once the workload is admitted

src/codeflare_sdk/ray/rayjobs/test/test_rayjob.py

Lines changed: 164 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,26 +1155,185 @@ def test_rayjob_kueue_explicit_local_queue(auto_mock_setup):
11551155
)
11561156

11571157

1158-
def test_rayjob_no_kueue_label_for_existing_cluster(auto_mock_setup):
1158+
def test_rayjob_queue_label_explicit_vs_default(auto_mock_setup, mocker):
11591159
"""
1160-
Test RayJob doesn't add Kueue label for existing clusters.
1160+
Test queue label behavior: explicit queue vs default queue auto-detection.
1161+
"""
1162+
# Mock default queue detection
1163+
mock_get_default = mocker.patch(
1164+
"codeflare_sdk.ray.rayjobs.rayjob.get_default_kueue_name",
1165+
return_value="default-queue",
1166+
)
1167+
1168+
config = ManagedClusterConfig(num_workers=1)
1169+
1170+
# Test 1: Explicit queue should be used (no default queue lookup)
1171+
mock_api_instance1 = auto_mock_setup["rayjob_api"]
1172+
mock_api_instance1.submit_job.return_value = {"metadata": {"name": "test-job-1"}}
1173+
rayjob1 = RayJob(
1174+
job_name="test-job-1",
1175+
entrypoint="python -c 'print()'",
1176+
cluster_config=config,
1177+
local_queue="explicit-queue",
1178+
)
1179+
rayjob1.submit()
1180+
call_args1 = mock_api_instance1.submit_job.call_args
1181+
submitted_job1 = call_args1.kwargs["job"]
1182+
assert (
1183+
submitted_job1["metadata"]["labels"]["kueue.x-k8s.io/queue-name"]
1184+
== "explicit-queue"
1185+
)
1186+
# Should not call get_default_kueue_name when explicit queue is provided
1187+
mock_get_default.assert_not_called()
1188+
1189+
# Reset mock for next test
1190+
mock_get_default.reset_mock()
1191+
mock_get_default.return_value = "default-queue"
1192+
1193+
# Test 2: Default queue should be auto-detected for new clusters
1194+
mock_api_instance2 = auto_mock_setup["rayjob_api"]
1195+
mock_api_instance2.submit_job.return_value = {"metadata": {"name": "test-job-2"}}
1196+
rayjob2 = RayJob(
1197+
job_name="test-job-2",
1198+
entrypoint="python -c 'print()'",
1199+
cluster_config=config,
1200+
# No local_queue specified
1201+
)
1202+
rayjob2.submit()
1203+
call_args2 = mock_api_instance2.submit_job.call_args
1204+
submitted_job2 = call_args2.kwargs["job"]
1205+
assert (
1206+
submitted_job2["metadata"]["labels"]["kueue.x-k8s.io/queue-name"]
1207+
== "default-queue"
1208+
)
1209+
# Should call get_default_kueue_name when no explicit queue
1210+
mock_get_default.assert_called_once()
1211+
1212+
# Test 3: Existing cluster without explicit queue should not have queue label
1213+
mock_api_instance3 = auto_mock_setup["rayjob_api"]
1214+
mock_api_instance3.submit_job.return_value = {"metadata": {"name": "test-job-3"}}
1215+
mock_get_default.reset_mock()
1216+
rayjob3 = RayJob(
1217+
job_name="test-job-3",
1218+
cluster_name="existing-cluster",
1219+
entrypoint="python -c 'print()'",
1220+
# No local_queue specified
1221+
)
1222+
rayjob3.submit()
1223+
call_args3 = mock_api_instance3.submit_job.call_args
1224+
submitted_job3 = call_args3.kwargs["job"]
1225+
assert "kueue.x-k8s.io/queue-name" not in submitted_job3["metadata"].get(
1226+
"labels", {}
1227+
)
1228+
# Should not call get_default_kueue_name for existing clusters
1229+
mock_get_default.assert_not_called()
1230+
1231+
1232+
def test_rayjob_priority_class(auto_mock_setup):
1233+
"""
1234+
Test RayJob adds priority class label when specified.
1235+
"""
1236+
mock_api_instance = auto_mock_setup["rayjob_api"]
1237+
mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}}
1238+
1239+
config = ManagedClusterConfig(num_workers=1)
1240+
rayjob = RayJob(
1241+
job_name="test-job",
1242+
entrypoint="python -c 'print()'",
1243+
cluster_config=config,
1244+
priority_class="high-priority",
1245+
)
1246+
1247+
rayjob.submit()
1248+
1249+
call_args = mock_api_instance.submit_job.call_args
1250+
submitted_job = call_args.kwargs["job"]
1251+
assert (
1252+
submitted_job["metadata"]["labels"]["kueue.x-k8s.io/priority-class"]
1253+
== "high-priority"
1254+
)
1255+
1256+
1257+
def test_rayjob_priority_class_not_added_when_none(auto_mock_setup):
1258+
"""
1259+
Test RayJob doesn't add priority class label when not specified.
1260+
"""
1261+
mock_api_instance = auto_mock_setup["rayjob_api"]
1262+
mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}}
1263+
1264+
config = ManagedClusterConfig(num_workers=1)
1265+
rayjob = RayJob(
1266+
job_name="test-job",
1267+
entrypoint="python -c 'print()'",
1268+
cluster_config=config,
1269+
)
1270+
1271+
rayjob.submit()
1272+
1273+
call_args = mock_api_instance.submit_job.call_args
1274+
submitted_job = call_args.kwargs["job"]
1275+
# Priority class label should not be present
1276+
assert "kueue.x-k8s.io/priority-class" not in submitted_job["metadata"].get(
1277+
"labels", {}
1278+
)
1279+
1280+
1281+
def test_rayjob_kueue_labels_with_existing_cluster(auto_mock_setup):
1282+
"""
1283+
Test RayJob adds Kueue labels when using existing cluster with explicit queue.
11611284
"""
11621285
mock_api_instance = auto_mock_setup["rayjob_api"]
11631286
mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}}
11641287

1165-
# Using existing cluster (no cluster_config)
11661288
rayjob = RayJob(
11671289
job_name="test-job",
11681290
cluster_name="existing-cluster",
11691291
entrypoint="python -c 'print()'",
1292+
local_queue="my-queue",
1293+
priority_class="medium-priority",
11701294
)
11711295

11721296
rayjob.submit()
11731297

1174-
# Verify no Kueue label was added
11751298
call_args = mock_api_instance.submit_job.call_args
11761299
submitted_job = call_args.kwargs["job"]
1177-
assert "kueue.x-k8s.io/queue-name" not in submitted_job["metadata"]["labels"]
1300+
1301+
# Verify Kueue labels are present
1302+
assert "kueue.x-k8s.io/queue-name" in submitted_job["metadata"]["labels"]
1303+
assert (
1304+
submitted_job["metadata"]["labels"]["kueue.x-k8s.io/queue-name"] == "my-queue"
1305+
)
1306+
assert (
1307+
submitted_job["metadata"]["labels"]["kueue.x-k8s.io/priority-class"]
1308+
== "medium-priority"
1309+
)
1310+
1311+
# Verify suspend is True when Kueue is used
1312+
assert submitted_job["spec"]["suspend"] is True
1313+
1314+
1315+
def test_rayjob_no_kueue_label_for_existing_cluster_without_queue(auto_mock_setup):
1316+
"""
1317+
Test RayJob doesn't add Kueue label for existing clusters when no queue specified.
1318+
"""
1319+
mock_api_instance = auto_mock_setup["rayjob_api"]
1320+
mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}}
1321+
1322+
# Using existing cluster (no cluster_config) and no explicit queue
1323+
rayjob = RayJob(
1324+
job_name="test-job",
1325+
cluster_name="existing-cluster",
1326+
entrypoint="python -c 'print()'",
1327+
)
1328+
1329+
rayjob.submit()
1330+
1331+
# Verify no Kueue label was added (no auto-detection for existing clusters)
1332+
call_args = mock_api_instance.submit_job.call_args
1333+
submitted_job = call_args.kwargs["job"]
1334+
assert "kueue.x-k8s.io/queue-name" not in submitted_job["metadata"].get(
1335+
"labels", {}
1336+
)
11781337

11791338

11801339
def test_rayjob_with_ttl_and_deadline(auto_mock_setup):

0 commit comments

Comments
 (0)