|
6 | 6 | # pylint: disable=reimported |
7 | 7 | import asyncio |
8 | 8 | import functools |
| 9 | +import logging |
9 | 10 | import traceback |
10 | 11 | from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine |
11 | 12 | from dataclasses import dataclass |
|
49 | 50 | from models_library.users import UserID |
50 | 51 | from pydantic import AnyUrl, ByteSize, TypeAdapter |
51 | 52 | from pytest_mock.plugin import MockerFixture |
| 53 | +from pytest_simcore.helpers.logging_tools import log_context |
52 | 54 | from pytest_simcore.helpers.typing_env import EnvVarsDict |
53 | 55 | from settings_library.s3 import S3Settings |
54 | 56 | from simcore_sdk.node_ports_v2 import FileLinkType |
@@ -140,41 +142,48 @@ def _minimal_dask_config( |
140 | 142 | @pytest.fixture |
141 | 143 | async def create_dask_client_from_scheduler( |
142 | 144 | _minimal_dask_config: None, |
143 | | - dask_spec_local_cluster: SpecCluster, |
| 145 | + dask_spec_local_cluster: distributed.SpecCluster, |
144 | 146 | minimal_app: FastAPI, |
145 | 147 | tasks_file_link_type: FileLinkType, |
146 | 148 | ) -> AsyncIterator[Callable[[], Awaitable[DaskClient]]]: |
147 | 149 | created_clients = [] |
148 | 150 |
|
149 | 151 | async def factory() -> DaskClient: |
150 | | - client = await DaskClient.create( |
151 | | - app=minimal_app, |
152 | | - settings=minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND, |
153 | | - endpoint=TypeAdapter(AnyUrl).validate_python( |
154 | | - dask_spec_local_cluster.scheduler_address |
155 | | - ), |
156 | | - authentication=NoAuthentication(), |
157 | | - tasks_file_link_type=tasks_file_link_type, |
158 | | - cluster_type=ClusterTypeInModel.ON_PREMISE, |
159 | | - ) |
160 | | - assert client |
161 | | - assert client.app == minimal_app |
162 | | - assert ( |
163 | | - client.settings |
164 | | - == minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND |
165 | | - ) |
| 152 | + with log_context( |
| 153 | + logging.INFO, |
| 154 | + f"Create director-v2 DaskClient to {dask_spec_local_cluster.scheduler_address}", |
| 155 | + ) as ctx: |
| 156 | + client = await DaskClient.create( |
| 157 | + app=minimal_app, |
| 158 | + settings=minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND, |
| 159 | + endpoint=TypeAdapter(AnyUrl).validate_python( |
| 160 | + dask_spec_local_cluster.scheduler_address |
| 161 | + ), |
| 162 | + authentication=NoAuthentication(), |
| 163 | + tasks_file_link_type=tasks_file_link_type, |
| 164 | + cluster_type=ClusterTypeInModel.ON_PREMISE, |
| 165 | + ) |
| 166 | + assert client |
| 167 | + assert client.app == minimal_app |
| 168 | + assert ( |
| 169 | + client.settings |
| 170 | + == minimal_app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND |
| 171 | + ) |
| 172 | + |
| 173 | + assert client.backend.client |
| 174 | + scheduler_infos = client.backend.client.scheduler_info() # type: ignore |
| 175 | + ctx.logger.info( |
| 176 | + "%s", |
| 177 | + f"--> Connected to scheduler via client {client=} to scheduler {scheduler_infos=}", |
| 178 | + ) |
166 | 179 |
|
167 | | - assert client.backend.client |
168 | | - scheduler_infos = client.backend.client.scheduler_info() # type: ignore |
169 | | - print( |
170 | | - f"--> Connected to scheduler via client {client=} to scheduler {scheduler_infos=}" |
171 | | - ) |
172 | 180 | created_clients.append(client) |
173 | 181 | return client |
174 | 182 |
|
175 | 183 | yield factory |
176 | | - await asyncio.gather(*[client.delete() for client in created_clients]) |
177 | | - print(f"<-- Disconnected scheduler clients {created_clients=}") |
| 184 | + |
| 185 | + with log_context(logging.INFO, "Disconnect scheduler clients"): |
| 186 | + await asyncio.gather(*[client.delete() for client in created_clients]) |
178 | 187 |
|
179 | 188 |
|
180 | 189 | @pytest.fixture(params=["create_dask_client_from_scheduler"]) |
@@ -728,15 +737,17 @@ def fake_remote_fct( |
728 | 737 | with pytest.raises(TaskCancelledError): |
729 | 738 | await dask_client.get_task_result(published_computation_task[0].job_id) |
730 | 739 |
|
731 | | - # after releasing the results, the task shall be UNKNOWN |
| 740 | + await asyncio.sleep(5) |
732 | 741 | await dask_client.release_task_result(published_computation_task[0].job_id) |
| 742 | + # after releasing the results, the task shall be UNKNOWN |
| 743 | + |
733 | 744 | # NOTE: this change of status takes a very long time to happen and is not relied upon so we skip it since it |
734 | 745 | # makes the test fail a lot for no gain (it's kept here in case it ever becomes an issue) |
735 | 746 | await _assert_wait_for_task_status( |
736 | 747 | published_computation_task[0].job_id, |
737 | 748 | dask_client, |
738 | 749 | RunningState.UNKNOWN, |
739 | | - timeout=120, |
| 750 | + timeout=10, |
740 | 751 | ) |
741 | 752 |
|
742 | 753 |
|
|
0 commit comments