Skip to content

Commit 410d2cb

Browse files
authored
Ensure k8s pod names/labels are RFC 1123 compliant (#3639)
- Modified Kubernetes pod names and labels to conform to RFC 1123 for DNS subdomain names and labels, ensuring compliance with Kubernetes naming conventions. - Modified `KubernetesProvider.submit()` to return an eight-character hex value as the job ID instead of the pod name. - Replaced the trailing timestamp in the pod name with the job ID to improve collision avoidance. - Replaced `app` pod label with `parsl-job-id`. - Updated container name to use job ID.
1 parent fec4e40 commit 410d2cb

File tree

5 files changed

+292
-29
lines changed

5 files changed

+292
-29
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ radical_local_test:
8484

8585
.PHONY: config_local_test
8686
config_local_test: $(CCTOOLS_INSTALL)
87-
pip3 install ".[monitoring,visualization,proxystore]"
87+
pip3 install ".[monitoring,visualization,proxystore,kubernetes]"
8888
PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10
8989

9090
.PHONY: site_test

parsl/providers/kubernetes/kube.py

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
import logging
2-
import time
3-
4-
from parsl.providers.kubernetes.template import template_string
5-
6-
logger = logging.getLogger(__name__)
7-
2+
import uuid
83
from typing import Any, Dict, List, Optional, Tuple
94

105
import typeguard
116

127
from parsl.errors import OptionalModuleMissing
138
from parsl.jobs.states import JobState, JobStatus
149
from parsl.providers.base import ExecutionProvider
15-
from parsl.utils import RepresentationMixin
10+
from parsl.providers.kubernetes.template import template_string
11+
from parsl.utils import RepresentationMixin, sanitize_dns_subdomain_rfc1123
1612

1713
try:
1814
from kubernetes import client, config
1915
_kubernetes_enabled = True
2016
except (ImportError, NameError, FileNotFoundError):
2117
_kubernetes_enabled = False
2218

19+
logger = logging.getLogger(__name__)
20+
2321
translate_table = {
2422
'Running': JobState.RUNNING,
2523
'Pending': JobState.PENDING,
@@ -161,7 +159,7 @@ def __init__(self,
161159
self.resources: Dict[object, Dict[str, Any]]
162160
self.resources = {}
163161

164-
def submit(self, cmd_string, tasks_per_node, job_name="parsl"):
162+
def submit(self, cmd_string: str, tasks_per_node: int, job_name: str = "parsl.kube"):
165163
""" Submit a job
166164
Args:
167165
- cmd_string :(String) - Name of the container to initiate
@@ -173,30 +171,34 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"):
173171
Returns:
174172
- job_id: (string) Identifier for the job
175173
"""
174+
job_id = uuid.uuid4().hex[:8]
176175

177-
cur_timestamp = str(time.time() * 1000).split(".")[0]
178-
job_name = "{0}-{1}".format(job_name, cur_timestamp)
179-
180-
if not self.pod_name:
181-
pod_name = '{}'.format(job_name)
182-
else:
183-
pod_name = '{}-{}'.format(self.pod_name,
184-
cur_timestamp)
176+
pod_name = self.pod_name or job_name
177+
try:
178+
pod_name = sanitize_dns_subdomain_rfc1123(pod_name)
179+
except ValueError:
180+
logger.warning(
181+
f"Invalid pod name '{pod_name}' for job '{job_id}', falling back to 'parsl.kube'"
182+
)
183+
pod_name = "parsl.kube"
184+
pod_name = pod_name[:253 - 1 - len(job_id)] # Leave room for the job ID
185+
pod_name = pod_name.rstrip(".-") # Remove trailing dot or hyphen after trim
186+
pod_name = f"{pod_name}.{job_id}"
185187

186188
formatted_cmd = template_string.format(command=cmd_string,
187189
worker_init=self.worker_init)
188190

189191
logger.debug("Pod name: %s", pod_name)
190192
self._create_pod(image=self.image,
191193
pod_name=pod_name,
192-
job_name=job_name,
194+
job_id=job_id,
193195
cmd_string=formatted_cmd,
194196
volumes=self.persistent_volumes,
195197
service_account_name=self.service_account_name,
196198
annotations=self.annotations)
197-
self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)}
199+
self.resources[job_id] = {'status': JobStatus(JobState.RUNNING), 'pod_name': pod_name}
198200

199-
return pod_name
201+
return job_id
200202

201203
def status(self, job_ids):
202204
""" Get the status of a list of jobs identified by the job identifiers
@@ -212,6 +214,9 @@ def status(self, job_ids):
212214
self._status()
213215
return [self.resources[jid]['status'] for jid in job_ids]
214216

217+
def _get_pod_name(self, job_id: str) -> str:
218+
return self.resources[job_id]['pod_name']
219+
215220
def cancel(self, job_ids):
216221
""" Cancels the jobs specified by a list of job ids
217222
Args:
@@ -221,7 +226,8 @@ def cancel(self, job_ids):
221226
"""
222227
for job in job_ids:
223228
logger.debug("Terminating job/pod: {0}".format(job))
224-
self._delete_pod(job)
229+
pod_name = self._get_pod_name(job)
230+
self._delete_pod(pod_name)
225231

226232
self.resources[job]['status'] = JobStatus(JobState.CANCELLED)
227233
rets = [True for i in job_ids]
@@ -242,7 +248,8 @@ def _status(self):
242248
for jid in to_poll_job_ids:
243249
phase = None
244250
try:
245-
pod = self.kube_client.read_namespaced_pod(name=jid, namespace=self.namespace)
251+
pod_name = self._get_pod_name(jid)
252+
pod = self.kube_client.read_namespaced_pod(name=pod_name, namespace=self.namespace)
246253
except Exception:
247254
logger.exception("Failed to poll pod {} status, most likely because pod was terminated".format(jid))
248255
if self.resources[jid]['status'] is JobStatus(JobState.RUNNING):
@@ -257,10 +264,10 @@ def _status(self):
257264
self.resources[jid]['status'] = JobStatus(status)
258265

259266
def _create_pod(self,
260-
image,
261-
pod_name,
262-
job_name,
263-
port=80,
267+
image: str,
268+
pod_name: str,
269+
job_id: str,
270+
port: int = 80,
264271
cmd_string=None,
265272
volumes=[],
266273
service_account_name=None,
@@ -269,7 +276,7 @@ def _create_pod(self,
269276
Args:
270277
- image (string) : Docker image to launch
271278
- pod_name (string) : Name of the pod
272-
- job_name (string) : App label
279+
- job_id (string) : Job ID
273280
KWargs:
274281
- port (integer) : Container port
275282
Returns:
@@ -299,7 +306,7 @@ def _create_pod(self,
299306
)
300307
# Configure Pod template container
301308
container = client.V1Container(
302-
name=pod_name,
309+
name=job_id,
303310
image=image,
304311
resources=resources,
305312
ports=[client.V1ContainerPort(container_port=port)],
@@ -322,7 +329,7 @@ def _create_pod(self,
322329
claim_name=volume[0])))
323330

324331
metadata = client.V1ObjectMeta(name=pod_name,
325-
labels={"app": job_name},
332+
labels={"parsl-job-id": job_id},
326333
annotations=annotations)
327334
spec = client.V1PodSpec(containers=[container],
328335
image_pull_secrets=[secret],
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import re
2+
from unittest import mock
3+
4+
import pytest
5+
6+
from parsl.providers.kubernetes.kube import KubernetesProvider
7+
from parsl.tests.test_utils.test_sanitize_dns import DNS_SUBDOMAIN_REGEX
8+
9+
_MOCK_BASE = "parsl.providers.kubernetes.kube"
10+
11+
12+
@pytest.fixture(autouse=True)
13+
def mock_kube_config():
14+
with mock.patch(f"{_MOCK_BASE}.config") as mock_config:
15+
mock_config.load_kube_config.return_value = None
16+
yield mock_config
17+
18+
19+
@pytest.fixture
20+
def mock_kube_client():
21+
mock_client = mock.MagicMock()
22+
with mock.patch(f"{_MOCK_BASE}.client.CoreV1Api") as mock_api:
23+
mock_api.return_value = mock_client
24+
yield mock_client
25+
26+
27+
@pytest.mark.local
28+
def test_submit_happy_path(mock_kube_client: mock.MagicMock):
29+
image = "test-image"
30+
namespace = "test-namespace"
31+
cmd_string = "test-command"
32+
volumes = [("test-volume", "test-mount-path")]
33+
service_account_name = "test-service-account"
34+
annotations = {"test-annotation": "test-value"}
35+
max_cpu = 2
36+
max_mem = "2Gi"
37+
init_cpu = 1
38+
init_mem = "1Gi"
39+
provider = KubernetesProvider(
40+
image=image,
41+
persistent_volumes=volumes,
42+
namespace=namespace,
43+
service_account_name=service_account_name,
44+
annotations=annotations,
45+
max_cpu=max_cpu,
46+
max_mem=max_mem,
47+
init_cpu=init_cpu,
48+
init_mem=init_mem,
49+
)
50+
51+
job_name = "test.job.name"
52+
job_id = provider.submit(cmd_string=cmd_string, tasks_per_node=1, job_name=job_name)
53+
54+
assert job_id in provider.resources
55+
assert mock_kube_client.create_namespaced_pod.call_count == 1
56+
57+
call_args = mock_kube_client.create_namespaced_pod.call_args[1]
58+
pod = call_args["body"]
59+
container = pod.spec.containers[0]
60+
volume = container.volume_mounts[0]
61+
62+
assert image == container.image
63+
assert namespace == call_args["namespace"]
64+
assert any(cmd_string in arg for arg in container.args)
65+
assert volumes[0] == (volume.name, volume.mount_path)
66+
assert service_account_name == pod.spec.service_account_name
67+
assert annotations == pod.metadata.annotations
68+
assert str(max_cpu) == container.resources.limits["cpu"]
69+
assert max_mem == container.resources.limits["memory"]
70+
assert str(init_cpu) == container.resources.requests["cpu"]
71+
assert init_mem == container.resources.requests["memory"]
72+
assert job_id == pod.metadata.labels["parsl-job-id"]
73+
assert job_id == container.name
74+
assert f"{job_name}.{job_id}" == pod.metadata.name
75+
76+
77+
@pytest.mark.local
78+
@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod")
79+
@pytest.mark.parametrize("char", (".", "-"))
80+
def test_submit_pod_name_includes_job_id(mock_create_pod: mock.MagicMock, char: str):
81+
provider = KubernetesProvider(image="test-image")
82+
83+
job_name = "a." * 121 + f"a{char}" + "a" * 9
84+
assert len(job_name) == 253 # Max length for pod name
85+
job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name)
86+
87+
expected_pod_name = job_name[:253 - len(job_id) - 2] + f".{job_id}"
88+
actual_pod_name = mock_create_pod.call_args[1]["pod_name"]
89+
assert re.match(DNS_SUBDOMAIN_REGEX, actual_pod_name)
90+
assert expected_pod_name == actual_pod_name
91+
92+
93+
@pytest.mark.local
94+
@mock.patch(f"{_MOCK_BASE}.KubernetesProvider._create_pod")
95+
@mock.patch(f"{_MOCK_BASE}.logger")
96+
@pytest.mark.parametrize("job_name", ("", ".", "-", "a.-.a", "$$$"))
97+
def test_submit_invalid_job_name(mock_logger: mock.MagicMock, mock_create_pod: mock.MagicMock, job_name: str):
98+
provider = KubernetesProvider(image="test-image")
99+
job_id = provider.submit(cmd_string="test-command", tasks_per_node=1, job_name=job_name)
100+
assert mock_logger.warning.call_count == 1
101+
assert f"Invalid pod name '{job_name}' for job '{job_id}'" in mock_logger.warning.call_args[0][0]
102+
assert f"parsl.kube.{job_id}" == mock_create_pod.call_args[1]["pod_name"]
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import random
2+
import re
3+
4+
import pytest
5+
6+
from parsl.utils import sanitize_dns_label_rfc1123, sanitize_dns_subdomain_rfc1123
7+
8+
# Ref: https://datatracker.ietf.org/doc/html/rfc1123
9+
DNS_LABEL_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?$'
10+
DNS_SUBDOMAIN_REGEX = r'^[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?(\.[a-z0-9]([-a-z0-9]{0,61}[a-z0-9])?)*$'
11+
12+
test_labels = [
13+
"example-label-123", # Valid label
14+
"EXAMPLE", # Case sensitivity
15+
"!@#example*", # Remove invalid characters
16+
"--leading-and-trailing--", # Leading and trailing hyphens
17+
"..leading.and.trailing..", # Leading and tailing dots
18+
"multiple..dots", # Consecutive dots
19+
"valid--label", # Consecutive hyphens
20+
"a" * random.randint(64, 70), # Longer than 63 characters
21+
f"{'a' * 62}-a", # Trailing hyphen at max length
22+
]
23+
24+
25+
def _generate_test_subdomains(num_subdomains: int):
26+
subdomains = []
27+
for _ in range(num_subdomains):
28+
num_labels = random.randint(1, 5)
29+
labels = [test_labels[random.randint(0, num_labels - 1)] for _ in range(num_labels)]
30+
subdomain = ".".join(labels)
31+
subdomains.append(subdomain)
32+
return subdomains
33+
34+
35+
@pytest.mark.local
36+
@pytest.mark.parametrize("raw_string", test_labels)
37+
def test_sanitize_dns_label_rfc1123(raw_string: str):
38+
print(sanitize_dns_label_rfc1123(raw_string))
39+
assert re.match(DNS_LABEL_REGEX, sanitize_dns_label_rfc1123(raw_string))
40+
41+
42+
@pytest.mark.local
43+
@pytest.mark.parametrize("raw_string", ("", "-", "@", "$$$"))
44+
def test_sanitize_dns_label_rfc1123_empty(raw_string: str):
45+
with pytest.raises(ValueError) as e_info:
46+
sanitize_dns_label_rfc1123(raw_string)
47+
assert str(e_info.value) == f"Sanitized DNS label is empty for input '{raw_string}'"
48+
49+
50+
@pytest.mark.local
51+
@pytest.mark.parametrize("raw_string", _generate_test_subdomains(10))
52+
def test_sanitize_dns_subdomain_rfc1123(raw_string: str):
53+
assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string))
54+
55+
56+
@pytest.mark.local
57+
@pytest.mark.parametrize("char", ("-", "."))
58+
def test_sanitize_dns_subdomain_rfc1123_trailing_non_alphanumeric_at_max_length(char: str):
59+
raw_string = (f"{'a' * 61}." * 4) + f".aaaa{char}a"
60+
assert re.match(DNS_SUBDOMAIN_REGEX, sanitize_dns_subdomain_rfc1123(raw_string))
61+
62+
63+
@pytest.mark.local
64+
@pytest.mark.parametrize("raw_string", ("", ".", "..."))
65+
def test_sanitize_dns_subdomain_rfc1123_empty(raw_string: str):
66+
with pytest.raises(ValueError) as e_info:
67+
sanitize_dns_subdomain_rfc1123(raw_string)
68+
assert str(e_info.value) == f"Sanitized DNS subdomain is empty for input '{raw_string}'"
69+
70+
71+
@pytest.mark.local
72+
@pytest.mark.parametrize(
73+
"raw_string", ("a" * 253, "a" * random.randint(254, 300)), ids=("254 chars", ">253 chars")
74+
)
75+
def test_sanitize_dns_subdomain_rfc1123_max_length(raw_string: str):
76+
assert len(sanitize_dns_subdomain_rfc1123(raw_string)) <= 253

0 commit comments

Comments
 (0)