Skip to content

Commit 159422d

Browse files
committed
removed clusters endpoint from dv-2
1 parent cbfbbf5 commit 159422d

File tree

7 files changed

+11
-1191
lines changed

7 files changed

+11
-1191
lines changed

services/director-v2/src/simcore_service_director_v2/api/entrypoints.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from .._meta import API_VTAG
44
from .routes import (
5-
clusters,
65
computations,
76
computations_tasks,
87
dynamic_scheduler,
@@ -27,7 +26,6 @@
2726
v2_router.include_router(
2827
dynamic_services.router, tags=["dynamic services"], prefix="/dynamic_services"
2928
)
30-
v2_router.include_router(clusters.router, tags=["clusters"], prefix="/clusters")
3129

3230
v2_router.include_router(
3331
dynamic_scheduler.router, tags=["dynamic scheduler"], prefix="/dynamic_scheduler"

services/director-v2/src/simcore_service_director_v2/api/routes/clusters.py

Lines changed: 0 additions & 236 deletions
This file was deleted.

services/director-v2/src/simcore_service_director_v2/modules/dask_client.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
from ..utils.dask_client_utils import (
7575
DaskSubSystem,
7676
TaskHandlers,
77-
create_internal_client_based_on_auth,
77+
connect_to_dask_scheduler,
7878
)
7979

8080
_logger = logging.getLogger(__name__)
@@ -133,7 +133,7 @@ async def create(
133133
) -> "DaskClient":
134134
_logger.info(
135135
"Initiating connection to %s with auth: %s, type: %s",
136-
f"dask-scheduler/gateway at {endpoint}",
136+
f"dask-scheduler at {endpoint}",
137137
authentication,
138138
cluster_type,
139139
)
@@ -149,9 +149,7 @@ async def create(
149149
endpoint,
150150
attempt.retry_state.attempt_number,
151151
)
152-
backend = await create_internal_client_based_on_auth(
153-
endpoint, authentication
154-
)
152+
backend = await connect_to_dask_scheduler(endpoint, authentication)
155153
dask_utils.check_scheduler_status(backend.client)
156154
instance = cls(
157155
app=app,
@@ -162,7 +160,7 @@ async def create(
162160
)
163161
_logger.info(
164162
"Connection to %s succeeded [%s]",
165-
f"dask-scheduler/gateway at {endpoint}",
163+
f"dask-scheduler at {endpoint}",
166164
json.dumps(attempt.retry_state.retry_object.statistics),
167165
)
168166
_logger.info(
@@ -331,14 +329,12 @@ async def send_computation_tasks(
331329
)
332330
dask_utils.check_communication_with_scheduler_is_open(self.backend.client)
333331
dask_utils.check_scheduler_status(self.backend.client)
334-
# NOTE: in case it's a gateway or it is an on-demand cluster
332+
# NOTE: in case it is an on-demand cluster
335333
# we do not check a priori if the task
336334
# is runnable because we CAN'T. A cluster might auto-scale, the worker(s)
337-
# might also auto-scale and the gateway does not know that a priori.
335+
# might also auto-scale we do not know that a priori.
338336
# So, we'll just send the tasks over and see what happens after a while.
339-
if (self.cluster_type != ClusterTypeInModel.ON_DEMAND) and (
340-
self.backend.gateway is None
341-
):
337+
if self.cluster_type != ClusterTypeInModel.ON_DEMAND:
342338
dask_utils.check_if_cluster_is_able_to_run_pipeline(
343339
project_id=project_id,
344340
node_id=node_id,

services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,16 @@
33
import socket
44
from collections.abc import Awaitable, Callable
55
from dataclasses import dataclass, field
6-
from typing import Final
76

87
import distributed
9-
import httpx
10-
from aiohttp import ClientConnectionError, ClientResponseError
118
from dask_task_models_library.container_tasks.events import (
129
TaskLogEvent,
1310
TaskProgressEvent,
1411
)
15-
from models_library.clusters import (
16-
ClusterAuthentication,
17-
InternalClusterAuthentication,
18-
TLSAuthentication,
19-
)
12+
from models_library.clusters import ClusterAuthentication, TLSAuthentication
2013
from pydantic import AnyUrl
2114

22-
from ..core.errors import ComputationalSchedulerError, ConfigurationError
15+
from ..core.errors import ConfigurationError
2316
from .dask import wrap_client_async_routine
2417

2518

@@ -53,8 +46,8 @@ async def close(self) -> None:
5346
await wrap_client_async_routine(self.client.close())
5447

5548

56-
async def _connect_to_dask_scheduler(
57-
endpoint: AnyUrl, authentication: InternalClusterAuthentication
49+
async def connect_to_dask_scheduler(
50+
endpoint: AnyUrl, authentication: ClusterAuthentication
5851
) -> DaskSubSystem:
5952
try:
6053
security = distributed.Security()
@@ -75,37 +68,3 @@ async def _connect_to_dask_scheduler(
7568
except TypeError as exc:
7669
msg = f"Scheduler has invalid configuration: {endpoint=}"
7770
raise ConfigurationError(msg=msg) from exc
78-
79-
80-
async def create_internal_client_based_on_auth(
81-
endpoint: AnyUrl, authentication: ClusterAuthentication
82-
) -> DaskSubSystem:
83-
return await _connect_to_dask_scheduler(endpoint, authentication) # type: ignore[arg-type] # _is_dask_scheduler checks already that it is a valid type
84-
85-
86-
_PING_TIMEOUT_S: Final[int] = 5
87-
_DASK_SCHEDULER_RUNNING_STATE: Final[str] = "running"
88-
89-
90-
async def test_scheduler_endpoint(endpoint: AnyUrl) -> None:
91-
"""This method will try to connect to a scheduler endpoint and raise a ConfigurationError in case of problem
92-
93-
:raises ConfigurationError: contians some information as to why the connection failed
94-
"""
95-
try:
96-
async with distributed.Client(
97-
address=f"{endpoint}", timeout=f"{_PING_TIMEOUT_S}", asynchronous=True
98-
) as dask_client:
99-
if dask_client.status != _DASK_SCHEDULER_RUNNING_STATE:
100-
msg = "internal scheduler is not running!"
101-
raise ComputationalSchedulerError(msg=msg) # noqa: TRY301
102-
103-
except (
104-
ClientConnectionError,
105-
ClientResponseError,
106-
httpx.HTTPError,
107-
ComputationalSchedulerError,
108-
) as exc:
109-
logger.debug("Pinging %s, failed: %s", f"{endpoint=}", f"{exc=!r}")
110-
msg = f"Could not connect to cluster in {endpoint}: error: {exc}"
111-
raise ConfigurationError(msg=msg) from exc

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,7 +986,6 @@ async def test_changed_scheduler_raises_exception(
986986
mocked_user_completed_cb.assert_not_called()
987987

988988

989-
@pytest.mark.flaky(max_runs=3)
990989
@pytest.mark.parametrize("fail_remote_fct", [False, True])
991990
async def test_get_tasks_status(
992991
dask_client: DaskClient,

0 commit comments

Comments
 (0)