Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,12 @@ def from_name(cls, name, **kwargs):
--------
>>> cluster = KubeCluster.from_name(name="simple-cluster")
"""
return cls(name=name, create_mode=CreateMode.CONNECT_ONLY, **kwargs)
defaults = {"create_mode": CreateMode.CONNECT_ONLY, "shutdown_on_close": False}
kwargs = defaults | kwargs
return cls(
name=name,
**kwargs,
)


def make_cluster_spec(
Expand Down
20 changes: 17 additions & 3 deletions dask_kubernetes/operator/kubecluster/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dask.distributed import Client
from distributed.utils import TimeoutError

from dask_kubernetes.aiopykube.dask import DaskCluster
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
from dask_kubernetes.exceptions import SchedulerStartupError

Expand Down Expand Up @@ -95,13 +97,25 @@ def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image):
assert client2.submit(lambda x: x + 1, 10).result() == 11


def test_cluster_from_name(kopf_runner, docker_image, ns):
@pytest.mark.asyncio
async def test_cluster_from_name(kopf_runner, docker_image, ns):
with kopf_runner:
with KubeCluster(
name="abc", namespace=ns, image=docker_image, n_workers=1
name="abc",
namespace=ns,
image=docker_image,
n_workers=1,
asynchronous=True,
) as firstcluster:
with KubeCluster.from_name("abc", namespace=ns) as secondcluster:
with KubeCluster.from_name(
"abc", namespace=ns, asynchronous=True
) as secondcluster:
assert firstcluster == secondcluster
k8s_api = HTTPClient(KubeConfig.from_env())
cluster = await DaskCluster.objects(k8s_api, namespace=ns).get_by_name(
"abc"
)
assert cluster.obj["status"]["phase"] == "Running"


def test_cluster_scheduler_info_updated(kopf_runner, docker_image, ns):
Expand Down