Skip to content

Commit 488c273

Browse files
committed
fixed issue with Future not having explicit client
1 parent 841a42a commit 488c273

File tree

1 file changed

+28
-12
lines changed

1 file changed

+28
-12
lines changed

services/director-v2/tests/unit/test_modules_dask_client.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
import traceback
1111
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
1212
from dataclasses import dataclass
13-
from typing import Any, NoReturn
13+
from inspect import isawaitable
14+
from typing import Any, NoReturn, cast
1415
from unittest import mock
1516
from uuid import uuid4
1617

@@ -163,7 +164,9 @@ async def factory() -> DaskClient:
163164
client = await DaskClient.create(
164165
app=minimal_app,
165166
settings=minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND,
166-
endpoint=TypeAdapter(AnyUrl).validate_python(dask_spec_local_cluster.scheduler_address),
167+
endpoint=TypeAdapter(AnyUrl).validate_python(
168+
dask_spec_local_cluster.scheduler_address
169+
),
167170
authentication=NoAuthentication(),
168171
tasks_file_link_type=tasks_file_link_type,
169172
cluster_type=ClusterTypeInModel.ON_PREMISE,
@@ -204,7 +207,9 @@ async def factory() -> DaskClient:
204207
client = await DaskClient.create(
205208
app=minimal_app,
206209
settings=minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND,
207-
endpoint=TypeAdapter(AnyUrl).validate_python(local_dask_gateway_server.address),
210+
endpoint=TypeAdapter(AnyUrl).validate_python(
211+
local_dask_gateway_server.address
212+
),
208213
authentication=SimpleAuthentication(
209214
username="pytest_user",
210215
password=SecretStr(local_dask_gateway_server.password),
@@ -426,7 +431,9 @@ def fct_that_raise_cancellation_error() -> NoReturn:
426431
async def test_dask_does_not_report_base_exception_in_task(dask_client: DaskClient):
427432
def fct_that_raise_base_exception() -> NoReturn:
428433
err_msg = "task triggers a base exception, but dask does not care..."
429-
raise BaseException(err_msg) # pylint: disable=broad-exception-raised
434+
raise BaseException( # pylint: disable=broad-exception-raised # noqa: TRY002
435+
err_msg
436+
)
430437

431438
future = dask_client.backend.client.submit(fct_that_raise_base_exception)
432439
# NOTE: Since asyncio.CancelledError is derived from BaseException and the worker code checks Exception only
@@ -464,7 +471,7 @@ def comp_run_metadata(faker: Faker) -> RunMetadataDict:
464471
return RunMetadataDict(
465472
product_name=faker.pystr(),
466473
simcore_user_agent=faker.pystr(),
467-
) | faker.pydict(allowed_types=(str,))
474+
) | cast(dict[str, str], faker.pydict(allowed_types=(str,)))
468475

469476

470477
@pytest.fixture
@@ -480,6 +487,9 @@ def task_labels(comp_run_metadata: RunMetadataDict) -> ContainerLabelsDict:
480487

481488
@pytest.fixture
482489
def hardware_info() -> HardwareInfo:
490+
assert "json_schema_extra" in HardwareInfo.model_config
491+
assert isinstance(HardwareInfo.model_config["json_schema_extra"], dict)
492+
assert isinstance(HardwareInfo.model_config["json_schema_extra"]["examples"], list)
483493
return HardwareInfo.model_validate(
484494
HardwareInfo.model_config["json_schema_extra"]["examples"][0]
485495
)
@@ -539,7 +549,9 @@ def fake_sidecar_fct(
539549
assert node_params.node_requirements.ram
540550
assert "product_name" in comp_run_metadata
541551
assert "simcore_user_agent" in comp_run_metadata
542-
assert image_params.fake_tasks[node_id].node_requirements is not None
552+
node_requirements = image_params.fake_tasks[node_id].node_requirements
553+
assert node_requirements
554+
543555
node_id_to_job_ids = await dask_client.send_computation_tasks(
544556
user_id=user_id,
545557
project_id=project_id,
@@ -555,8 +567,8 @@ def fake_sidecar_fct(
555567
f"{to_simcore_runtime_docker_label_key('user-id')}": f"{user_id}",
556568
f"{to_simcore_runtime_docker_label_key('project-id')}": f"{project_id}",
557569
f"{to_simcore_runtime_docker_label_key('node-id')}": f"{node_id}",
558-
f"{to_simcore_runtime_docker_label_key('cpu-limit')}": f"{image_params.fake_tasks[node_id].node_requirements.cpu}",
559-
f"{to_simcore_runtime_docker_label_key('memory-limit')}": f"{image_params.fake_tasks[node_id].node_requirements.ram}",
570+
f"{to_simcore_runtime_docker_label_key('cpu-limit')}": f"{node_requirements.cpu}",
571+
f"{to_simcore_runtime_docker_label_key('memory-limit')}": f"{node_requirements.ram}",
560572
f"{to_simcore_runtime_docker_label_key('product-name')}": f"{comp_run_metadata['product_name']}",
561573
f"{to_simcore_runtime_docker_label_key('simcore-user-agent')}": f"{comp_run_metadata['simcore_user_agent']}",
562574
f"{to_simcore_runtime_docker_label_key('swarm-stack-name')}": "undefined-label",
@@ -986,7 +998,6 @@ async def test_disconnected_backend_raises_exception(
986998
# DISCONNECT THE CLUSTER
987999
await dask_spec_local_cluster.close() # type: ignore
9881000
await local_dask_gateway_server.server.cleanup()
989-
#
9901001
with pytest.raises(ComputationalBackendNotConnectedError):
9911002
await dask_client.send_computation_tasks(
9921003
user_id=user_id,
@@ -1191,9 +1202,13 @@ def fake_remote_fct(
11911202
assert len(published_computation_task) == 1
11921203

11931204
assert published_computation_task[0].node_id in cpu_image.fake_tasks
1194-
computation_future = distributed.Future(published_computation_task[0].job_id)
1205+
computation_future = distributed.Future(
1206+
published_computation_task[0].job_id, client=dask_client.backend.client
1207+
)
11951208
print("--> waiting for job to finish...")
1196-
await distributed.wait(computation_future, timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS) # type: ignore
1209+
await distributed.wait(
1210+
computation_future, timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS
1211+
)
11971212
assert computation_future.done()
11981213
print("job finished, now checking that we received the publications...")
11991214

@@ -1334,7 +1349,8 @@ async def test_get_cluster_details_robust_to_worker_disappearing(
13341349
await dask_client.get_cluster_details()
13351350

13361351
async def _scale_up_and_down() -> None:
1337-
assert dask_client.backend.gateway_cluster
1352+
assert dask_client.backend.gateway_cluster is not None
1353+
assert isawaitable(dask_client.backend.gateway_cluster)
13381354
await dask_client.backend.gateway_cluster.scale(40)
13391355
await asyncio.sleep(1)
13401356
await dask_client.backend.gateway_cluster.scale(1)

0 commit comments

Comments
 (0)