Skip to content

Commit 40b89f2

Browse files
committed
feat: add option to validate k8s spec (#1152)
1 parent 5053c87 commit 40b89f2

File tree

2 files changed

+140
-8
lines changed

2 files changed

+140
-8
lines changed

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ def app_to_resource(
369369
queue: str,
370370
service_account: Optional[str],
371371
priority_class: Optional[str] = None,
372-
) -> Dict[str, object]:
372+
) -> Dict[str, Any]:
373373
"""
374374
app_to_resource creates a volcano job kubernetes resource definition from
375375
the provided AppDef. The resource definition can be used to launch the
@@ -444,7 +444,7 @@ def app_to_resource(
444444
if priority_class is not None:
445445
job_spec["priorityClassName"] = priority_class
446446

447-
resource: Dict[str, object] = {
447+
resource: Dict[str, Any] = {
448448
"apiVersion": "batch.volcano.sh/v1alpha1",
449449
"kind": "Job",
450450
"metadata": {"name": f"{unique_app_id}"},
@@ -456,7 +456,7 @@ def app_to_resource(
456456
@dataclass
457457
class KubernetesJob:
458458
images_to_push: Dict[str, Tuple[str, str]]
459-
resource: Dict[str, object]
459+
resource: Dict[str, Any]
460460

461461
def __str__(self) -> str:
462462
return yaml.dump(sanitize_for_serialization(self.resource))
@@ -471,6 +471,7 @@ class KubernetesOpts(TypedDict, total=False):
471471
image_repo: Optional[str]
472472
service_account: Optional[str]
473473
priority_class: Optional[str]
474+
validate_spec: Optional[bool]
474475

475476

476477
class KubernetesScheduler(
@@ -659,6 +660,36 @@ def _submit_dryrun(
659660
), "priority_class must be a str"
660661

661662
resource = app_to_resource(app, queue, service_account, priority_class)
663+
664+
if cfg.get("validate_spec"):
665+
try:
666+
self._custom_objects_api().create_namespaced_custom_object(
667+
group="batch.volcano.sh",
668+
version="v1alpha1",
669+
namespace=cfg.get("namespace") or "default",
670+
plural="jobs",
671+
body=resource,
672+
dry_run="All",
673+
)
674+
except Exception as e:
675+
from kubernetes.client.rest import ApiException
676+
677+
if isinstance(e, ApiException):
678+
raise ValueError(f"Invalid job spec: {e.reason}") from e
679+
raise
680+
681+
job_name = resource["metadata"]["name"]
682+
for task in resource["spec"]["tasks"]:
683+
task_name = task["name"]
684+
replicas = task.get("replicas", 1)
685+
max_index = replicas - 1
686+
pod_name = f"{job_name}-{task_name}-{max_index}"
687+
if len(pod_name) > 63:
688+
raise ValueError(
689+
f"Pod name '{pod_name}' ({len(pod_name)} chars) exceeds 63 character limit. "
690+
f"Shorten app.name or role names"
691+
)
692+
662693
req = KubernetesJob(
663694
resource=resource,
664695
images_to_push=images_to_push,
@@ -703,6 +734,12 @@ def _run_opts(self) -> runopts:
703734
type_=str,
704735
help="The name of the PriorityClass to set on the job specs",
705736
)
737+
opts.add(
738+
"validate_spec",
739+
type_=bool,
740+
help="Validate job spec using Kubernetes API dry-run before submission",
741+
default=True,
742+
)
706743
return opts
707744

708745
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import sys
1212
import unittest
1313
from datetime import datetime
14-
from typing import Any, Dict
14+
from typing import Any, cast, Dict
1515
from unittest.mock import MagicMock, patch
1616

1717
import torchx
@@ -111,7 +111,6 @@ def test_app_to_resource_resolved_macros(self) -> None:
111111
make_unique_ctx.return_value = unique_app_name
112112
resource = app_to_resource(app, "test_queue", service_account=None)
113113
actual_cmd = (
114-
# pyre-ignore [16]
115114
resource["spec"]["tasks"][0]["template"]
116115
.spec.containers[0]
117116
.command
@@ -135,7 +134,6 @@ def test_retry_policy_not_set(self) -> None:
135134
{"event": "PodEvicted", "action": "RestartJob"},
136135
{"event": "PodFailed", "action": "RestartJob"},
137136
],
138-
# pyre-ignore [16]
139137
resource["spec"]["tasks"][0]["policies"],
140138
)
141139
for role in app.roles:
@@ -517,7 +515,7 @@ def test_rank0_env(self) -> None:
517515
make_unique_ctx.return_value = "app-name-42"
518516
info = scheduler.submit_dryrun(app, cfg)
519517

520-
tasks = info.request.resource["spec"]["tasks"] # pyre-ignore[16]
518+
tasks = info.request.resource["spec"]["tasks"]
521519
container0 = tasks[0]["template"].spec.containers[0]
522520
self.assertIn("TORCHX_RANK0_HOST", container0.command)
523521
self.assertIn(
@@ -726,6 +724,7 @@ def test_runopts(self) -> None:
726724
"image_repo",
727725
"service_account",
728726
"priority_class",
727+
"validate_spec",
729728
},
730729
)
731730

@@ -925,10 +924,106 @@ def test_min_replicas(self) -> None:
925924
resource = app_to_resource(app, "test_queue", service_account=None)
926925
min_available = [
927926
task["minAvailable"]
928-
for task in resource["spec"]["tasks"] # pyre-ignore[16]
927+
for task in resource["spec"]["tasks"]
929928
]
930929
self.assertEqual(min_available, [1, 1, 0])
931930

931+
@patch(
932+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
933+
)
934+
def test_validate_spec_invalid_name(self, mock_api: MagicMock) -> None:
935+
from kubernetes.client.rest import ApiException
936+
937+
scheduler = create_scheduler("test")
938+
app = _test_app()
939+
app.name = "Invalid_Name"
940+
941+
mock_api_instance = MagicMock()
942+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
943+
status=422,
944+
reason="Invalid",
945+
)
946+
mock_api.return_value = mock_api_instance
947+
948+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
949+
950+
with self.assertRaises(ValueError) as ctx:
951+
scheduler.submit_dryrun(app, cfg)
952+
953+
self.assertIn("Invalid job spec", str(ctx.exception))
954+
mock_api_instance.create_namespaced_custom_object.assert_called_once()
955+
call_kwargs = mock_api_instance.create_namespaced_custom_object.call_args[1]
956+
self.assertEqual(call_kwargs["dry_run"], "All")
957+
958+
def test_validate_spec_enabled_by_default(self) -> None:
959+
scheduler = create_scheduler("test")
960+
app = _test_app()
961+
962+
cfg = KubernetesOpts({"queue": "testqueue"})
963+
964+
with patch(
965+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
966+
) as mock_api:
967+
mock_api_instance = MagicMock()
968+
mock_api_instance.create_namespaced_custom_object.return_value = {}
969+
mock_api.return_value = mock_api_instance
970+
971+
info = scheduler.submit_dryrun(app, cfg)
972+
973+
self.assertIsNotNone(info)
974+
mock_api_instance.create_namespaced_custom_object.assert_called_once()
975+
call_kwargs = mock_api_instance.create_namespaced_custom_object.call_args[1]
976+
self.assertEqual(call_kwargs["dry_run"], "All")
977+
978+
@patch(
979+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
980+
)
981+
def test_validate_spec_invalid_task_name(self, mock_api: MagicMock) -> None:
982+
from kubernetes.client.rest import ApiException
983+
984+
scheduler = create_scheduler("test")
985+
app = _test_app()
986+
app.roles[0].name = "Invalid-Task-Name"
987+
988+
mock_api_instance = MagicMock()
989+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
990+
status=422,
991+
reason="Invalid",
992+
)
993+
mock_api.return_value = mock_api_instance
994+
995+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
996+
997+
with self.assertRaises(ValueError) as ctx:
998+
scheduler.submit_dryrun(app, cfg)
999+
1000+
self.assertIn("Invalid job spec", str(ctx.exception))
1001+
1002+
@patch(
1003+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
1004+
)
1005+
def test_validate_spec_long_pod_name(self, mock_api: MagicMock) -> None:
1006+
scheduler = create_scheduler("test")
1007+
app = _test_app()
1008+
app.name = "x" * 50
1009+
app.roles[0].name = "y" * 20
1010+
1011+
mock_api_instance = MagicMock()
1012+
mock_api_instance.create_namespaced_custom_object.return_value = {}
1013+
mock_api.return_value = mock_api_instance
1014+
1015+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
1016+
1017+
with patch(
1018+
"torchx.schedulers.kubernetes_scheduler.make_unique"
1019+
) as make_unique_ctx:
1020+
make_unique_ctx.return_value = "x" * 50
1021+
with self.assertRaises(ValueError) as ctx:
1022+
scheduler.submit_dryrun(app, cfg)
1023+
1024+
self.assertIn("Pod name", str(ctx.exception))
1025+
self.assertIn("exceeds 63 character limit", str(ctx.exception))
1026+
9321027

9331028
class KubernetesSchedulerNoImportTest(unittest.TestCase):
9341029
"""

0 commit comments

Comments
 (0)