Skip to content

Commit e7ecaec

Browse files
committed
feat: add option to validate k8s spec (#1152)
1 parent 1d26b39 commit e7ecaec

File tree

2 files changed

+130
-1
lines changed

2 files changed

+130
-1
lines changed

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ class KubernetesOpts(TypedDict, total=False):
471471
image_repo: Optional[str]
472472
service_account: Optional[str]
473473
priority_class: Optional[str]
474+
validate_spec: Optional[bool]
474475

475476

476477
class KubernetesScheduler(
@@ -659,6 +660,36 @@ def _submit_dryrun(
659660
), "priority_class must be a str"
660661

661662
resource = app_to_resource(app, queue, service_account, priority_class)
663+
664+
if cfg.get("validate_spec"):
665+
try:
666+
self._custom_objects_api().create_namespaced_custom_object(
667+
group="batch.volcano.sh",
668+
version="v1alpha1",
669+
namespace=cfg.get("namespace") or "default",
670+
plural="jobs",
671+
body=resource,
672+
dry_run="All",
673+
)
674+
except Exception as e:
675+
from kubernetes.client.rest import ApiException
676+
677+
if isinstance(e, ApiException):
678+
raise ValueError(f"Invalid job spec: {e.reason}") from e
679+
raise
680+
681+
job_name = resource["metadata"]["name"]
682+
for task in resource["spec"]["tasks"]:
683+
task_name = task["name"]
684+
replicas = task.get("replicas", 1)
685+
max_index = replicas - 1
686+
pod_name = f"{job_name}-{task_name}-{max_index}"
687+
if len(pod_name) > 63:
688+
raise ValueError(
689+
f"Pod name '{pod_name}' ({len(pod_name)} chars) exceeds 63 character limit. "
690+
f"Shorten app.name or role names"
691+
)
692+
662693
req = KubernetesJob(
663694
resource=resource,
664695
images_to_push=images_to_push,
@@ -703,6 +734,12 @@ def _run_opts(self) -> runopts:
703734
type_=str,
704735
help="The name of the PriorityClass to set on the job specs",
705736
)
737+
opts.add(
738+
"validate_spec",
739+
type_=bool,
740+
help="Validate job spec using Kubernetes API dry-run before submission",
741+
default=False,
742+
)
706743
return opts
707744

708745
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import sys
1212
import unittest
1313
from datetime import datetime
14-
from typing import Any, Dict
14+
from typing import Any, cast, Dict
1515
from unittest.mock import MagicMock, patch
1616

1717
import torchx
@@ -726,6 +726,7 @@ def test_runopts(self) -> None:
726726
"image_repo",
727727
"service_account",
728728
"priority_class",
729+
"validate_spec",
729730
},
730731
)
731732

@@ -929,6 +930,97 @@ def test_min_replicas(self) -> None:
929930
]
930931
self.assertEqual(min_available, [1, 1, 0])
931932

933+
@patch(
934+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
935+
)
936+
def test_validate_spec_invalid_name(self, mock_api: MagicMock) -> None:
937+
from kubernetes.client.rest import ApiException
938+
939+
scheduler = create_scheduler("test")
940+
app = _test_app()
941+
app.name = "Invalid_Name"
942+
943+
mock_api_instance = MagicMock()
944+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
945+
status=422,
946+
reason="Invalid",
947+
)
948+
mock_api.return_value = mock_api_instance
949+
950+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
951+
952+
with self.assertRaises(ValueError) as ctx:
953+
scheduler.submit_dryrun(app, cfg)
954+
955+
self.assertIn("Invalid job spec", str(ctx.exception))
956+
mock_api_instance.create_namespaced_custom_object.assert_called_once()
957+
call_kwargs = mock_api_instance.create_namespaced_custom_object.call_args[1]
958+
self.assertEqual(call_kwargs["dry_run"], "All")
959+
960+
def test_validate_spec_disabled_by_default(self) -> None:
961+
scheduler = create_scheduler("test")
962+
app = _test_app()
963+
app.name = "x" * 60
964+
965+
cfg = KubernetesOpts({"queue": "testqueue"})
966+
967+
with patch(
968+
"torchx.schedulers.kubernetes_scheduler.make_unique"
969+
) as make_unique_ctx:
970+
make_unique_ctx.return_value = "x" * 70
971+
info = scheduler.submit_dryrun(app, cfg)
972+
973+
self.assertIsNotNone(info)
974+
975+
@patch(
976+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
977+
)
978+
def test_validate_spec_invalid_task_name(self, mock_api: MagicMock) -> None:
979+
from kubernetes.client.rest import ApiException
980+
981+
scheduler = create_scheduler("test")
982+
app = _test_app()
983+
app.roles[0].name = "Invalid-Task-Name"
984+
985+
mock_api_instance = MagicMock()
986+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
987+
status=422,
988+
reason="Invalid",
989+
)
990+
mock_api.return_value = mock_api_instance
991+
992+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
993+
994+
with self.assertRaises(ValueError) as ctx:
995+
scheduler.submit_dryrun(app, cfg)
996+
997+
self.assertIn("Invalid job spec", str(ctx.exception))
998+
999+
@patch(
1000+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
1001+
)
1002+
def test_validate_spec_long_pod_name(self, mock_api: MagicMock) -> None:
1003+
scheduler = create_scheduler("test")
1004+
app = _test_app()
1005+
app.name = "x" * 50
1006+
app.roles[0].name = "y" * 20
1007+
1008+
mock_api_instance = MagicMock()
1009+
mock_api_instance.create_namespaced_custom_object.return_value = {}
1010+
mock_api.return_value = mock_api_instance
1011+
1012+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
1013+
1014+
with patch(
1015+
"torchx.schedulers.kubernetes_scheduler.make_unique"
1016+
) as make_unique_ctx:
1017+
make_unique_ctx.return_value = "x" * 50
1018+
with self.assertRaises(ValueError) as ctx:
1019+
scheduler.submit_dryrun(app, cfg)
1020+
1021+
self.assertIn("Pod name", str(ctx.exception))
1022+
self.assertIn("exceeds 63 character limit", str(ctx.exception))
1023+
9321024

9331025
class KubernetesSchedulerNoImportTest(unittest.TestCase):
9341026
"""

0 commit comments

Comments
 (0)