Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels):
"dask.org/component": "scheduler",
}
)

# Check if NodePort is out of range
for port in spec.get("ports", []):
if port.get("nodePort", None) and (
port["nodePort"] < 30000 or port["nodePort"] > 32767
):
raise ValueError("NodePort out of range")
Copy link
Member

@jacobtomlinson jacobtomlinson May 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more deeply I don't think we want to check this at this point. The resource has already been created and we are in an unschedulable state because the resource cannot be created.

I think we want to add a validation hook to check this at resource creation time.

https://kopf.readthedocs.io/en/stable/admission/#validation-handlers

# Something like
@kopf.on. validate("daskcluster.kubernetes.dask.org")
async def daskcluster_validate_nodeport(spec, warnings, **kwargs):
    for port in spec.get("ports", []):
        if port.get("nodePort", None) and (
            port["nodePort"] < 30000 or port["nodePort"] > 32767
        ):
            raise kopf.AdmissionError(f"nodePort must be between 30000 and 32767.")


return {
"apiVersion": "v1",
"kind": "Service",
Expand Down
27 changes: 27 additions & 0 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import yaml
from dask.distributed import Client

from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from dask_kubernetes.operator.controller import (
KUBERNETES_DATETIME_FORMAT,
get_job_runner_pod_name,
Expand Down Expand Up @@ -433,3 +434,29 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job):

assert "A DaskJob has been created" in runner.stdout
assert "Job failed, deleting Dask cluster." in runner.stdout


def custom_nodeport_spec(port, name="foo", scheduler_service_type="NodePort"):
try:
port = int(port)
except ValueError:
raise ValueError(f"{port} is not a valid integer")

spec = make_cluster_spec(name, scheduler_service_type)
spec["spec"]["scheduler"]["service"]["ports"][0]["nodePort"] = port
return spec


def test_nodeport_valid(kopf_runner):
with kopf_runner:
spec = custom_nodeport_spec("30007")
with KubeCluster(custom_cluster_spec=spec, n_workers=1) as cluster:
with Client(cluster) as client:
assert client.submit(lambda x: x + 1, 10).result() == 11


def test_nodeport_out_of_range(kopf_runner):
with kopf_runner:
spec = custom_nodeport_spec("38967")
with pytest.raises(ValueError, match="NodePort out of range"):
KubeCluster(custom_cluster_spec=spec, n_workers=1)