diff --git a/dask_kubernetes/classic/tests/test_sync.py b/dask_kubernetes/classic/tests/test_sync.py index 5356cb102..49feb5b95 100644 --- a/dask_kubernetes/classic/tests/test_sync.py +++ b/dask_kubernetes/classic/tests/test_sync.py @@ -5,7 +5,7 @@ import dask import pytest from dask.distributed import Client, wait -from distributed.utils_test import loop, captured_logger # noqa: F401 +from distributed.utils_test import captured_logger from dask.utils import tmpfile from dask_kubernetes import KubeCluster, make_pod_spec @@ -75,17 +75,17 @@ def test_ipython_display(cluster): sleep(0.5) -def test_env(pod_spec, loop): - with KubeCluster(pod_spec, env={"ABC": "DEF"}, loop=loop) as cluster: +def test_env(pod_spec): + with KubeCluster(pod_spec, env={"ABC": "DEF"}) as cluster: cluster.scale(1) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: while not cluster.scheduler_info["workers"]: sleep(0.1) env = client.run(lambda: dict(os.environ)) assert all(v["ABC"] == "DEF" for v in env.values()) -def dont_test_pod_template_yaml(docker_image, loop): +def dont_test_pod_template_yaml(docker_image): test_yaml = { "kind": "Pod", "metadata": {"labels": {"app": "dask", "component": "dask-worker"}}, @@ -109,9 +109,9 @@ def dont_test_pod_template_yaml(docker_image, loop): with tmpfile(extension="yaml") as fn: with open(fn, mode="w") as f: yaml.dump(test_yaml, f) - with KubeCluster(f.name, loop=loop) as cluster: + with KubeCluster(f.name) as cluster: cluster.scale(2) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result(timeout=10) assert result == 11 @@ -128,7 +128,7 @@ def dont_test_pod_template_yaml(docker_image, loop): assert all(client.has_what().values()) -def test_pod_template_yaml_expand_env_vars(docker_image, loop): +def test_pod_template_yaml_expand_env_vars(docker_image): try: os.environ["FOO_IMAGE"] = docker_image @@ -155,13 +155,13 @@ def test_pod_template_yaml_expand_env_vars(docker_image, loop): with tmpfile(extension="yaml") as fn: with open(fn, mode="w") as f: yaml.dump(test_yaml, f) - with KubeCluster(f.name, loop=loop) as cluster: + with KubeCluster(f.name) as cluster: assert cluster.pod_template.spec.containers[0].image == docker_image finally: del os.environ["FOO_IMAGE"] -def test_pod_template_dict(docker_image, loop): +def test_pod_template_dict(docker_image): spec = { "metadata": {}, "restartPolicy": "Never", @@ -185,9 +185,9 @@ def test_pod_template_dict(docker_image, loop): }, } - with KubeCluster(spec, loop=loop) as cluster: + with KubeCluster(spec) as cluster: cluster.scale(2) - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result() assert result == 11 @@ -202,7 +202,7 @@ def test_pod_template_dict(docker_image, loop): assert all(client.has_what().values()) -def test_pod_template_minimal_dict(docker_image, loop): +def test_pod_template_minimal_dict(docker_image): spec = { "spec": { "containers": [ @@ -224,9 +224,9 @@ def test_pod_template_minimal_dict(docker_image, loop): } } - with KubeCluster(spec, loop=loop) as cluster: + with KubeCluster(spec) as cluster: cluster.adapt() - with Client(cluster, loop=loop) as client: + with Client(cluster) as client: future = client.submit(lambda x: x + 1, 10) result = future.result() assert result == 11 @@ -264,9 +264,9 @@ def test_bad_args(): KubeCluster({"kind": "Pod"}) -def test_constructor_parameters(pod_spec, loop): +def test_constructor_parameters(pod_spec): env = {"FOO": "BAR", "A": 1} - with KubeCluster(pod_spec, name="myname", loop=loop, env=env) as cluster: + with KubeCluster(pod_spec, name="myname", env=env) as cluster: pod = cluster.pod_template var = [v for v in pod.spec.containers[0].env if v.name == "FOO"] @@ -380,7 +380,7 @@ def test_maximum(cluster): assert "scale beyond maximum number of workers" in result.lower() -def test_extra_pod_config(docker_image, loop): +def test_extra_pod_config(docker_image): """ Test that our pod config merging process works fine """ @@ -388,7 +388,6 @@ def test_extra_pod_config(docker_image, loop): make_pod_spec( docker_image, extra_pod_config={"automountServiceAccountToken": False} ), - loop=loop, n_workers=0, ) as cluster: @@ -397,7 +396,7 @@ def test_extra_pod_config(docker_image, loop): assert pod.spec.automount_service_account_token is False -def test_extra_container_config(docker_image, loop): +def test_extra_container_config(docker_image): """ Test that our container config merging process works fine """ @@ -409,7 +408,6 @@ def test_extra_container_config(docker_image, loop): "securityContext": {"runAsUser": 0}, }, ), - loop=loop, n_workers=0, ) as cluster: @@ -419,7 +417,7 @@ def test_extra_container_config(docker_image, loop): assert pod.spec.containers[0].security_context == {"runAsUser": 0} -def test_container_resources_config(docker_image, loop): +def test_container_resources_config(docker_image): """ Test container resource requests / limits being set properly """ @@ -427,7 +425,6 @@ def test_container_resources_config(docker_image, loop): make_pod_spec( docker_image, memory_request="0.5G", memory_limit="1G", cpu_limit="1" ), - loop=loop, n_workers=0, ) as cluster: @@ -439,7 +436,7 @@ def test_container_resources_config(docker_image, loop): assert "cpu" not in pod.spec.containers[0].resources.requests -def test_extra_container_config_merge(docker_image, loop): +def test_extra_container_config_merge(docker_image): """ Test that our container config merging process works recursively fine """ @@ -452,7 +449,6 @@ def test_extra_container_config_merge(docker_image, loop): "args": ["last-item"], }, ), - loop=loop, n_workers=0, ) as cluster: @@ -464,7 +460,7 @@ def test_extra_container_config_merge(docker_image, loop): assert pod.spec.containers[0].args[-1] == "last-item" -def test_worker_args(docker_image, loop): +def test_worker_args(docker_image): """ Test that dask-worker arguments are added to the container args """ @@ -474,7 +470,6 @@ def test_worker_args(docker_image, loop): memory_limit="5000M", resources="FOO=1 BAR=2", ), - loop=loop, n_workers=0, ) as cluster: diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 9008856c4..a38738725 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -6,6 +6,9 @@ from weakref import finalize import kubernetes_asyncio as kubernetes +from tornado.iostream import StreamClosedError + +from distributed.core import rpc from .utils import check_dependency @@ -15,7 +18,7 @@ async def get_external_address_for_scheduler_service( service, port_forward_cluster_ip=None, service_name_resolution_retries=20, - port_name="comm", + port_name="tcp-comm", ): """Take a service object and return the scheduler address.""" [port] = [ @@ -108,7 +111,7 @@ async def port_forward_dashboard(service_name, namespace): return port -async def get_scheduler_address(service_name, namespace, port_name="comm"): +async def get_scheduler_address(service_name, namespace, port_name="tcp-comm"): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) service = await api.read_namespaced_service(service_name, namespace) @@ -132,6 +135,21 @@ async def wait_for_scheduler(cluster_name, namespace): label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=scheduler", timeout_seconds=60, ): - if event["object"].status.phase == "Running": - watch.stop() + if event["object"].status.conditions: + conditions = { + c.type: c.status for c in event["object"].status.conditions + } + if "Ready" in conditions and conditions["Ready"] == "True": + watch.stop() + await asyncio.sleep(0.1) + + +async def wait_for_scheduler_comm(address): + while True: + try: + async with rpc(address) as scheduler_comm: + await scheduler_comm.versions() + except (StreamClosedError, OSError): await asyncio.sleep(0.1) + continue + break diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 9bc463cd1..fa28c2700 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -20,6 +20,7 @@ ) from dask_kubernetes.common.auth import ClusterAuth +from dask_kubernetes.common.utils import namespace_default from dask_kubernetes.operator import ( build_cluster_spec, wait_for_service, @@ -27,8 +28,8 @@ from dask_kubernetes.common.networking import ( get_scheduler_address, - port_forward_dashboard, wait_for_scheduler, + wait_for_scheduler_comm, ) @@ -121,7 +122,7 @@ class KubeCluster(Cluster): def __init__( self, name, - namespace="default", + namespace=None, image="ghcr.io/dask/dask:latest", n_workers=3, resources={}, @@ -133,8 +134,7 @@ def __init__( **kwargs, ): self.name = name - # TODO: Set namespace to None and get default namespace from user's context - self.namespace = namespace + self.namespace = namespace or namespace_default() self.image = image self.n_workers = n_workers self.resources = resources @@ -208,10 +208,15 @@ async def _create_cluster(self): ) from e await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) - self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.forwarded_dashboard_port = await port_forward_dashboard( - f"{self.name}-cluster-service", self.namespace + scheduler_address = await self._get_scheduler_address() + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + dashboard_address = await get_scheduler_address( + f"{self.name}-cluster-service", + self.namespace, + port_name="http-dashboard", ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -230,10 +235,15 @@ async def _connect_cluster(self): service_name = f'{cluster_spec["metadata"]["name"]}-service' await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) - self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.forwarded_dashboard_port = await port_forward_dashboard( - f"{self.name}-cluster-service", self.namespace + scheduler_address = await self._get_scheduler_address() + await wait_for_scheduler_comm(scheduler_address) + self.scheduler_comm = rpc(scheduler_address) + dashboard_address = await get_scheduler_address( + service_name, + self.namespace, + port_name="http-dashboard", ) + self.forwarded_dashboard_port = dashboard_address.split(":")[-1] async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: @@ -465,30 +475,28 @@ def _build_scheduler_spec(self, cluster_name): { "name": "scheduler", "image": self.image, - "args": [ - "dask-scheduler", - ], + "args": ["dask-scheduler", "--host", "0.0.0.0"], "env": env, "resources": self.resources, "ports": [ { - "name": "comm", + "name": "tcp-comm", "containerPort": 8786, "protocol": "TCP", }, { - "name": "dashboard", + "name": "http-dashboard", "containerPort": 8787, "protocol": "TCP", }, ], "readinessProbe": { - "tcpSocket": {"port": "comm"}, + "httpGet": {"port": "http-dashboard", "path": "/health"}, "initialDelaySeconds": 5, "periodSeconds": 10, }, "livenessProbe": { - "tcpSocket": {"port": "comm"}, + "httpGet": {"port": "http-dashboard", "path": "/health"}, "initialDelaySeconds": 15, "periodSeconds": 20, }, @@ -503,16 +511,16 @@ def _build_scheduler_spec(self, cluster_name): }, "ports": [ { - "name": "comm", + "name": "tcp-comm", "protocol": "TCP", "port": 8786, - "targetPort": "comm", + "targetPort": "tcp-comm", }, { - "name": "dashboard", + "name": "http-dashboard", "protocol": "TCP", "port": 8787, - "targetPort": "dashboard", + "targetPort": "http-dashboard", }, ], }, diff --git a/dask_kubernetes/kubernetes.yaml b/dask_kubernetes/kubernetes.yaml index 4ab27f9ec..b0bc374ac 100644 --- a/dask_kubernetes/kubernetes.yaml +++ b/dask_kubernetes/kubernetes.yaml @@ -30,11 +30,11 @@ kubernetes: dask.org/cluster-name: "" # Cluster name will be added automatically dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 targetPort: 8786 - - name: dashboard + - name: http-dashboard protocol: TCP port: 8787 targetPort: 8787 diff --git a/dask_kubernetes/operator/deployment/manifests/operator.yaml b/dask_kubernetes/operator/deployment/manifests/operator.yaml index 1c54a0ede..73cf5c92b 100644 --- a/dask_kubernetes/operator/deployment/manifests/operator.yaml +++ b/dask_kubernetes/operator/deployment/manifests/operator.yaml @@ -1,7 +1,15 @@ +apiVersion: v1 +kind: Namespace +metadata: + labels: + kubernetes.io/metadata.name: dask-operator + name: dask-operator + name: dask-operator +--- apiVersion: apps/v1 kind: Deployment metadata: - namespace: "kube-system" + namespace: dask-operator name: dask-kubernetes-operator spec: replicas: 1 @@ -24,7 +32,7 @@ spec: apiVersion: v1 kind: ServiceAccount metadata: - namespace: "kube-system" + namespace: dask-operator name: dask-kubernetes-operator --- apiVersion: rbac.authorization.k8s.io/v1 @@ -61,7 +69,7 @@ rules: - apiGroups: [""] resources: [pods] verbs: [create, delete, get, watch, list] - + - apiGroups: [""] resources: [services] verbs: [create, delete, get, watch, list] @@ -77,4 +85,4 @@ roleRef: subjects: - kind: ServiceAccount name: dask-kubernetes-operator - namespace: "kube-system" \ No newline at end of file + namespace: dask-operator diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index e53cb18a7..826b67829 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -234,7 +234,7 @@ async def retire_workers( dashboard_address = await get_scheduler_address( scheduler_service_name, namespace, - port_name="dashboard", + port_name="http-dashboard", ) async with aiohttp.ClientSession() as session: url = f"{dashboard_address}/api/v1/retire_workers" diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index 674f51174..ee52c0eab 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -27,20 +27,22 @@ spec: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: @@ -52,11 +54,11 @@ spec: dask.org/cluster-name: simple-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" diff --git a/dask_kubernetes/operator/tests/resources/simplejob.yaml b/dask_kubernetes/operator/tests/resources/simplejob.yaml index ee0636ec2..e931df292 100644 --- a/dask_kubernetes/operator/tests/resources/simplejob.yaml +++ b/dask_kubernetes/operator/tests/resources/simplejob.yaml @@ -40,20 +40,22 @@ spec: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: @@ -65,11 +67,11 @@ spec: dask.org/cluster-name: simple-job-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" diff --git a/doc/source/operator_installation.rst b/doc/source/operator_installation.rst index d07df9296..a9b5fcadd 100644 --- a/doc/source/operator_installation.rst +++ b/doc/source/operator_installation.rst @@ -34,8 +34,8 @@ This will create the appropriate roles, service accounts and a deployment for th .. code-block:: console $ kubectl get pods -A -l application=dask-kubernetes-operator - NAMESPACE NAME READY STATUS RESTARTS AGE - kube-system dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s + NAMESPACE NAME READY STATUS RESTARTS AGE + dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s Installing with Helm @@ -55,3 +55,18 @@ This will install the custom resource definitions, service account, roles, and t .. warning:: Please note that `Helm does not support updating or deleting CRDs. `_ If updates are made to the CRD templates in future releases (to support future k8s releases, for example) you may have to manually update the CRDs. + +Kubeflow +-------- + +In order to use the Dask Operator with `Kubeflow `_ you need to perform some extra installation steps. + +User permissions +^^^^^^^^^^^^^^^^ + +Kubeflow doesn't know anything about our Dask custom resource definitions so we need to update the ``kubeflow-kubernetes-edit`` cluster role. This role +allows users with cluster edit permissions to create pods, jobs and other resources and we need to add the Dask custom resources to that list. + +.. code-block:: console + + $ kubectl patch clusterrole kubeflow-kubernetes-edit --patch '{"rules": [{"apiGroups": ["kubernetes.dask.org"],"resources": ["*"],"verbs": ["*"]}]}' diff --git a/doc/source/operator_resources.rst b/doc/source/operator_resources.rst index b8785c9ad..adf3ab064 100644 --- a/doc/source/operator_resources.rst +++ b/doc/source/operator_resources.rst @@ -76,36 +76,38 @@ Let's create an example called ``cluster.yaml`` with the following configuration args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm - initialDelaySeconds: 5 - periodSeconds: 10 + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 5 + periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm - initialDelaySeconds: 15 - periodSeconds: 20 + httpGet: + port: http-dashboard + path: /health + initialDelaySeconds: 15 + periodSeconds: 20 service: type: NodePort selector: dask.org/cluster-name: simple-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" Editing this file will change the default configuration of you Dask cluster. See the Configuration Reference :ref:`config`. Now apply ``cluster.yaml`` @@ -256,18 +258,20 @@ Let's create an example called ``highmemworkers.yaml`` with the following config imagePullPolicy: "IfNotPresent" resources: requests: - memory: "2Gi" + memory: "32Gi" limits: memory: "32Gi" args: - dask-worker - --name - $(DASK_WORKER_NAME) + - --resources + - MEMORY=32e9 The main thing we need to ensure is that the ``cluster`` option matches the name of the cluster we created earlier. This will cause the workers to join that cluster. -See the Configuration Reference :ref:`config`. Now apply ``highmemworkers.yaml`` +See the :ref:`config`. Now apply ``highmemworkers.yaml`` .. code-block:: console @@ -379,20 +383,22 @@ Let's create an example called ``job.yaml`` with the following configuration: args: - dask-scheduler ports: - - name: comm + - name: tcp-comm containerPort: 8786 protocol: TCP - - name: dashboard + - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: - tcpSocket: - port: comm + httpGet: + port: http-dashboard + path: /health initialDelaySeconds: 15 periodSeconds: 20 env: @@ -404,17 +410,17 @@ Let's create an example called ``job.yaml`` with the following configuration: dask.org/cluster-name: simple-job-cluster dask.org/component: scheduler ports: - - name: comm + - name: tcp-comm protocol: TCP port: 8786 - targetPort: "comm" - - name: dashboard + targetPort: "tcp-comm" + - name: http-dashboard protocol: TCP port: 8787 - targetPort: "dashboard" + targetPort: "http-dashboard" -Editing this file will change the default configuration of you Dask job. See the Configuration Reference :ref:`config`. Now apply ``job.yaml`` +Editing this file will change the default configuration of you Dask job. See the :ref:`config`. Now apply ``job.yaml`` .. code-block:: console