Skip to content

Commit 3f8a317

Browse files
authored
♻️ Refactor dy-sidecar output directories event detection (ITISFoundation#3476)
1 parent 51cc7cb commit 3f8a317

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2640
-721
lines changed

packages/service-library/src/servicelib/background_task.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import contextlib
33
import datetime
44
import logging
5-
from typing import Awaitable, Callable
5+
from typing import Awaitable, Callable, Optional
66

77
from servicelib.logging_utils import log_catch, log_context
88
from tenacity import TryAgain
@@ -29,7 +29,7 @@ async def _periodic_scheduled_task(
2929
), log_catch(logger):
3030
await task(**task_kwargs)
3131

32-
raise TryAgain
32+
raise TryAgain()
3333

3434

3535
async def start_periodic_task(
@@ -53,11 +53,14 @@ async def start_periodic_task(
5353
)
5454

5555

56-
async def stop_periodic_task(asyncio_task: asyncio.Task) -> None:
56+
async def stop_periodic_task(
57+
asyncio_task: asyncio.Task, *, timeout: Optional[float] = None
58+
) -> None:
5759
with log_context(
5860
logger,
5961
logging.INFO,
6062
msg=f"cancel periodic background task '{asyncio_task.get_name()}'",
6163
), contextlib.suppress(asyncio.CancelledError):
6264
asyncio_task.cancel()
63-
await asyncio_task
65+
with log_catch(logger, reraise=False):
66+
await asyncio.wait((asyncio_task,), timeout=timeout)

packages/service-library/src/servicelib/docker_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
_DOCKER_TIMESTAMP_LENGTH = len("2020-10-09T12:28:14.771034")
44

55

6-
76
def to_datetime(docker_timestamp: str) -> datetime:
87
# datetime_str is typically '2020-10-09T12:28:14.771034099Z'
98
# - The T separates the date portion from the time-of-day portion

packages/service-library/tests/test_background_task.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66

77
import asyncio
88
import datetime
9-
from typing import AsyncIterator, Awaitable, Callable
9+
from typing import AsyncIterator, Awaitable, Callable, Optional
1010
from unittest import mock
1111

1212
import pytest
1313
from faker import Faker
14+
from pytest import FixtureRequest
1415
from pytest_mock.plugin import MockerFixture
1516
from servicelib.background_task import start_periodic_task, stop_periodic_task
1617

@@ -28,9 +29,14 @@ def task_interval() -> datetime.timedelta:
2829
return datetime.timedelta(seconds=_FAST_POLL_INTERVAL)
2930

3031

32+
@pytest.fixture(params=[None, 1])
33+
def stop_task_timeout(request: FixtureRequest) -> Optional[float]:
34+
return request.param
35+
36+
3137
@pytest.fixture
3238
async def create_background_task(
33-
faker: Faker,
39+
faker: Faker, stop_task_timeout: Optional[float]
3440
) -> AsyncIterator[Callable[[datetime.timedelta, Callable], Awaitable[asyncio.Task]]]:
3541
created_tasks = []
3642

@@ -48,7 +54,9 @@ async def _creator(
4854

4955
yield _creator
5056
# cleanup
51-
await asyncio.gather(*(stop_periodic_task(t) for t in created_tasks))
57+
await asyncio.gather(
58+
*(stop_periodic_task(t, timeout=stop_task_timeout) for t in created_tasks)
59+
)
5260

5361

5462
async def test_background_task_created_and_deleted(

packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from asyncio import CancelledError
23
from pathlib import Path
34
from typing import Optional, Union
45

@@ -262,13 +263,15 @@ async def download_file_from_link(
262263
async def _abort_upload(
263264
session: ClientSession, upload_links: FileUploadSchema, *, reraise_exceptions: bool
264265
) -> None:
266+
# abort the upload correctly, so it can revert back to last version
265267
try:
266268
async with session.post(upload_links.links.abort_upload) as resp:
267269
resp.raise_for_status()
268270
except ClientError:
269271
log.warning("Error while aborting upload", exc_info=True)
270272
if reraise_exceptions:
271273
raise
274+
log.warning("Upload aborted")
272275

273276

274277
async def upload_file(
@@ -352,10 +355,12 @@ async def upload_file(
352355
except (r_clone.RCloneFailedError, exceptions.S3TransferError) as exc:
353356
log.error("The upload failed with an unexpected error:", exc_info=True)
354357
if upload_links:
355-
# abort the upload correctly, so it can revert back to last version
356358
await _abort_upload(session, upload_links, reraise_exceptions=False)
357-
log.warning("Upload aborted")
358359
raise exceptions.S3TransferError from exc
360+
except CancelledError:
361+
if upload_links:
362+
await _abort_upload(session, upload_links, reraise_exceptions=False)
363+
raise
359364
if io_log_redirect_cb:
360365
await io_log_redirect_cb(f"upload of {file_to_upload} complete.")
361366
return store_id, e_tag

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,18 @@ async def containers_docker_status(
9191
return {}
9292

9393
@log_decorator(logger=logger)
94-
async def service_disable_dir_watcher(
94+
async def disable_service_outputs_watcher(
9595
self, dynamic_sidecar_endpoint: AnyHttpUrl
9696
) -> None:
97-
await self._thin_client.patch_containers_directory_watcher(
97+
await self._thin_client.patch_containers_outputs_watcher(
9898
dynamic_sidecar_endpoint, is_enabled=False
9999
)
100100

101101
@log_decorator(logger=logger)
102-
async def service_enable_dir_watcher(
102+
async def enable_service_outputs_watcher(
103103
self, dynamic_sidecar_endpoint: AnyHttpUrl
104104
) -> None:
105-
await self._thin_client.patch_containers_directory_watcher(
105+
await self._thin_client.patch_containers_outputs_watcher(
106106
dynamic_sidecar_endpoint, is_enabled=True
107107
)
108108

@@ -367,11 +367,10 @@ async def pull_service_output_ports(
367367
async def push_service_output_ports(
368368
self,
369369
dynamic_sidecar_endpoint: AnyHttpUrl,
370-
port_keys: Optional[list[str]] = None,
371370
progress_callback: Optional[ProgressCallback] = None,
372371
) -> None:
373372
response = await self._thin_client.post_containers_tasks_ports_outputs_push(
374-
dynamic_sidecar_endpoint, port_keys
373+
dynamic_sidecar_endpoint
375374
)
376375
task_id: TaskId = response.json()
377376

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ async def get_containers(
9090

9191
@retry_on_errors
9292
@expect_status(status.HTTP_204_NO_CONTENT)
93-
async def patch_containers_directory_watcher(
93+
async def patch_containers_outputs_watcher(
9494
self, dynamic_sidecar_endpoint: AnyHttpUrl, *, is_enabled: bool
9595
) -> Response:
9696
url = self._get_url(dynamic_sidecar_endpoint, "/containers/directory-watcher")
@@ -210,13 +210,10 @@ async def post_containers_tasks_ports_outputs_pull(
210210
@retry_on_errors
211211
@expect_status(status.HTTP_202_ACCEPTED)
212212
async def post_containers_tasks_ports_outputs_push(
213-
self,
214-
dynamic_sidecar_endpoint: AnyHttpUrl,
215-
port_keys: Optional[list[str]] = None,
213+
self, dynamic_sidecar_endpoint: AnyHttpUrl
216214
) -> Response:
217-
port_keys = [] if port_keys is None else port_keys
218215
url = self._get_url(dynamic_sidecar_endpoint, "/containers/ports/outputs:push")
219-
return await self.client.post(url, json=port_keys)
216+
return await self.client.post(url)
220217

221218
@retry_on_errors
222219
@expect_status(status.HTTP_202_ACCEPTED)

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_utils.py

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
from collections import deque
3-
from contextlib import asynccontextmanager
4-
from typing import Any, AsyncIterator, Deque, Dict, Final, List, Optional, Type
3+
from typing import Any, Deque, Final, Optional
54

65
from fastapi import FastAPI
76
from pydantic import AnyHttpUrl
@@ -47,30 +46,7 @@
4746
RESOURCE_STATE_AND_INPUTS: Final[ResourceName] = "state_and_inputs"
4847

4948

50-
@asynccontextmanager
51-
async def disabled_directory_watcher(
52-
dynamic_sidecar_client: DynamicSidecarClient, dynamic_sidecar_endpoint: AnyHttpUrl
53-
) -> AsyncIterator[None]:
54-
"""
55-
The following will happen when using this context manager:
56-
- Disables file system event watcher while writing
57-
to the outputs directory to avoid data being pushed
58-
via nodeports upon change.
59-
- Enables file system event watcher so data from outputs
60-
can be again synced via nodeports upon change.
61-
"""
62-
try:
63-
await dynamic_sidecar_client.service_disable_dir_watcher(
64-
dynamic_sidecar_endpoint
65-
)
66-
yield
67-
finally:
68-
await dynamic_sidecar_client.service_enable_dir_watcher(
69-
dynamic_sidecar_endpoint
70-
)
71-
72-
73-
def get_repository(app: FastAPI, repo_type: Type[BaseRepository]) -> BaseRepository:
49+
def get_repository(app: FastAPI, repo_type: type[BaseRepository]) -> BaseRepository:
7450
return get_base_repository(engine=app.state.engine, repo_type=repo_type)
7551

7652

@@ -80,8 +56,8 @@ def get_director_v0_client(app: FastAPI) -> DirectorV0Client:
8056

8157

8258
def parse_containers_inspect(
83-
containers_inspect: Optional[Dict[str, Any]]
84-
) -> List[DockerContainerInspect]:
59+
containers_inspect: Optional[dict[str, Any]]
60+
) -> list[DockerContainerInspect]:
8561
results: Deque[DockerContainerInspect] = deque()
8662

8763
if containers_inspect is None:
@@ -94,10 +70,10 @@ def parse_containers_inspect(
9470

9571

9672
def are_all_user_services_containers_running(
97-
containers_inspect: List[DockerContainerInspect],
73+
containers_inspect: list[DockerContainerInspect],
9874
) -> bool:
9975
return len(containers_inspect) > 0 and all(
100-
(x.status == DockerStatus.RUNNING for x in containers_inspect)
76+
x.status == DockerStatus.RUNNING for x in containers_inspect
10177
)
10278

10379

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
RESOURCE_STATE_AND_INPUTS,
6060
are_all_user_services_containers_running,
6161
attempt_pod_removal_and_data_saving,
62-
disabled_directory_watcher,
6362
get_director_v0_client,
6463
get_repository,
6564
parse_containers_inspect,
@@ -362,46 +361,43 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
362361
)
363362

364363
async def _pull_outputs_and_state():
365-
async with disabled_directory_watcher(
366-
dynamic_sidecar_client, dynamic_sidecar_endpoint
367-
):
368-
tasks = [
369-
dynamic_sidecar_client.pull_service_output_ports(
364+
tasks = [
365+
dynamic_sidecar_client.pull_service_output_ports(
366+
dynamic_sidecar_endpoint
367+
)
368+
]
369+
# When enabled no longer downloads state via nodeports
370+
# S3 is used to store state paths
371+
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
372+
tasks.append(
373+
dynamic_sidecar_client.restore_service_state(
370374
dynamic_sidecar_endpoint
371375
)
372-
]
373-
# When enabled no longer downloads state via nodeports
374-
# S3 is used to store state paths
375-
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
376-
tasks.append(
377-
dynamic_sidecar_client.restore_service_state(
378-
dynamic_sidecar_endpoint
379-
)
380-
)
376+
)
381377

382-
await logged_gather(*tasks, max_concurrency=2)
378+
await logged_gather(*tasks, max_concurrency=2)
383379

384-
# inside this directory create the missing dirs, fetch those form the labels
385-
director_v0_client: DirectorV0Client = get_director_v0_client(app)
386-
simcore_service_labels: SimcoreServiceLabels = (
387-
await director_v0_client.get_service_labels(
388-
service=ServiceKeyVersion(
389-
key=scheduler_data.key, version=scheduler_data.version
390-
)
380+
# inside this directory create the missing dirs, fetch those form the labels
381+
director_v0_client: DirectorV0Client = get_director_v0_client(app)
382+
simcore_service_labels: SimcoreServiceLabels = (
383+
await director_v0_client.get_service_labels(
384+
service=ServiceKeyVersion(
385+
key=scheduler_data.key, version=scheduler_data.version
391386
)
392387
)
393-
service_outputs_labels = json.loads(
394-
simcore_service_labels.dict().get("io.simcore.outputs", "{}")
395-
).get("outputs", {})
396-
logger.debug(
397-
"Creating dirs from service outputs labels: %s",
398-
service_outputs_labels,
399-
)
400-
await dynamic_sidecar_client.service_outputs_create_dirs(
401-
dynamic_sidecar_endpoint, service_outputs_labels
402-
)
388+
)
389+
service_outputs_labels = json.loads(
390+
simcore_service_labels.dict().get("io.simcore.outputs", "{}")
391+
).get("outputs", {})
392+
logger.debug(
393+
"Creating dirs from service outputs labels: %s",
394+
service_outputs_labels,
395+
)
396+
await dynamic_sidecar_client.service_outputs_create_dirs(
397+
dynamic_sidecar_endpoint, service_outputs_labels
398+
)
403399

404-
scheduler_data.dynamic_sidecar.is_service_environment_ready = True
400+
scheduler_data.dynamic_sidecar.is_service_environment_ready = True
405401

406402
if dynamic_sidecar_settings.DYNAMIC_SIDECAR_DOCKER_NODE_RESOURCE_LIMITS_ENABLED:
407403
node_rights_manager = NodeRightsManager.instance(app)
@@ -481,6 +477,10 @@ async def progress_create_containers(
481477
dynamic_sidecar_endpoint, compose_spec, progress_create_containers
482478
)
483479

480+
await dynamic_sidecar_client.enable_service_outputs_watcher(
481+
dynamic_sidecar_endpoint
482+
)
483+
484484
# Starts PROXY -----------------------------------------------
485485
# The entrypoint container name was now computed
486486
# continue starting the proxy

0 commit comments

Comments
 (0)