Skip to content

Commit 0b1b17b

Browse files
GitHKAndrei Neagu
andauthored
♻️ refactor dy-sidecar volume searching (ITISFoundation#3110)
* removal of code which no effect in swarm mode * dy-sidecar now uses run_id to searach for volumes Co-authored-by: Andrei Neagu <[email protected]>
1 parent 6686072 commit 0b1b17b

File tree

20 files changed

+180
-179
lines changed

20 files changed

+180
-179
lines changed

services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from asyncio import Lock
44
from enum import Enum
55
from typing import Any, Dict, List, Mapping, Optional
6-
from uuid import UUID
6+
from uuid import UUID, uuid4
77

88
from models_library.projects_nodes_io import NodeID
99
from models_library.service_settings_labels import (
@@ -128,6 +128,16 @@ def mark_removed(self) -> None:
128128

129129

130130
class DynamicSidecar(BaseModel):
131+
run_id: UUID = Field(
132+
default_factory=uuid4,
133+
description=(
134+
"Used to discriminate between dynamic-sidecar docker resources "
135+
"generated during different runs. Sometimes artifacts remain in the"
136+
"system after an error. This helps avoiding collisions."
137+
"For now used by anonymous volumes involved in data sharing"
138+
),
139+
)
140+
131141
status: Status = Field(
132142
Status.create_as_initially_ok(),
133143
description="status of the service sidecar also with additional information",

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api.py

Lines changed: 18 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
import traceback
99
from contextlib import asynccontextmanager
10-
from typing import Any, AsyncIterator, Dict, List, Mapping, Optional, Set, Tuple
10+
from typing import Any, AsyncIterator, Mapping, Optional
1111

1212
import aiodocker
1313
from aiodocker.utils import clean_filters, clean_map
@@ -17,7 +17,6 @@
1717
from models_library.projects import ProjectID
1818
from models_library.projects_nodes_io import NodeID
1919
from models_library.users import UserID
20-
from packaging import version
2120
from servicelib.utils import logged_gather
2221
from tenacity._asyncio import AsyncRetrying
2322
from tenacity.retry import retry_if_exception_type
@@ -43,49 +42,6 @@
4342
log = logging.getLogger(__name__)
4443

4544

46-
def _monkey_patch_aiodocker() -> None:
47-
"""Raises an error once the library is up to date."""
48-
from aiodocker import volumes
49-
from aiodocker.volumes import DockerVolume
50-
51-
if version.parse(aiodocker.__version__) > version.parse("0.21.0"):
52-
raise RuntimeError(
53-
"Please check that PR https://github.com/aio-libs/aiodocker/pull/623 "
54-
"is not part of the current bump version. "
55-
"Otherwise, if the current PR is part of this new release "
56-
"remove monkey_patch."
57-
)
58-
59-
# pylint: disable=protected-access
60-
async def _custom_volumes_list(self, *, filters=None):
61-
"""
62-
Return a list of volumes
63-
64-
Args:
65-
filters: a dict with a list of filters
66-
67-
Available filters:
68-
dangling=<boolean>
69-
driver=<volume-driver-name>
70-
label=<key> or label=<key>:<value>
71-
name=<volume-name>
72-
"""
73-
params = {} if filters is None else {"filters": clean_filters(filters)}
74-
75-
data = await self.docker._query_json("volumes", params=params)
76-
return data
77-
78-
async def _custom_volumes_get(self, id): # pylint: disable=redefined-builtin
79-
data = await self.docker._query_json("volumes/{id}".format(id=id), method="GET")
80-
return DockerVolume(self.docker, data["Name"])
81-
82-
setattr(volumes.DockerVolumes, "list", _custom_volumes_list)
83-
setattr(volumes.DockerVolumes, "get", _custom_volumes_get)
84-
85-
86-
_monkey_patch_aiodocker()
87-
88-
8945
class _RetryError(Exception):
9046
pass
9147

@@ -106,7 +62,7 @@ async def docker_client() -> AsyncIterator[aiodocker.docker.Docker]:
10662
await client.close()
10763

10864

109-
async def get_swarm_network(dynamic_sidecar_settings: DynamicSidecarSettings) -> Dict:
65+
async def get_swarm_network(dynamic_sidecar_settings: DynamicSidecarSettings) -> dict:
11066
async with docker_client() as client:
11167
all_networks = await client.networks.list()
11268

@@ -127,7 +83,7 @@ async def get_swarm_network(dynamic_sidecar_settings: DynamicSidecarSettings) ->
12783
return networks[0]
12884

12985

130-
async def create_network(network_config: Dict[str, Any]) -> str:
86+
async def create_network(network_config: dict[str, Any]) -> str:
13187
async with docker_client() as client:
13288
try:
13389
docker_network = await client.networks.create(network_config)
@@ -168,17 +124,17 @@ async def create_service_and_get_id(create_service_data: AioDockerServiceSpec) -
168124
return service_start_result["ID"]
169125

170126

171-
async def inspect_service(service_id: str) -> Dict[str, Any]:
127+
async def inspect_service(service_id: str) -> dict[str, Any]:
172128
async with docker_client() as client:
173129
return await client.services.inspect(service_id)
174130

175131

176132
async def get_dynamic_sidecars_to_observe(
177133
dynamic_sidecar_settings: DynamicSidecarSettings,
178-
) -> List[SchedulerData]:
134+
) -> list[SchedulerData]:
179135
"""called when scheduler is started to discover new services to observe"""
180136
async with docker_client() as client:
181-
running_dynamic_sidecar_services: List[
137+
running_dynamic_sidecar_services: list[
182138
Mapping[str, Any]
183139
] = await client.services.list(
184140
filters={
@@ -196,12 +152,12 @@ async def get_dynamic_sidecars_to_observe(
196152
async def _extract_task_data_from_service_for_state(
197153
service_id: str,
198154
dynamic_sidecar_settings: DynamicSidecarSettings,
199-
target_statuses: Set[str],
200-
) -> Dict[str, Any]:
155+
target_statuses: set[str],
156+
) -> dict[str, Any]:
201157
"""Waits until the dynamic-sidecar task is in one of the target_statuses
202158
and then returns the task"""
203159

204-
async def _sleep_or_error(started: float, task: Dict):
160+
async def _sleep_or_error(started: float, task: dict):
205161
await asyncio.sleep(1.0)
206162
elapsed = time.time() - started
207163
if (
@@ -217,7 +173,7 @@ async def _sleep_or_error(started: float, task: Dict):
217173

218174
async with docker_client() as client:
219175
service_state: Optional[str] = None
220-
task: Dict[str, Any] = {}
176+
task: dict[str, Any] = {}
221177

222178
started = time.time()
223179

@@ -275,8 +231,8 @@ async def get_service_placement(
275231
return task["NodeID"]
276232

277233

278-
async def get_dynamic_sidecar_state(service_id: str) -> Tuple[ServiceState, str]:
279-
def _make_pending() -> Tuple[ServiceState, str]:
234+
async def get_dynamic_sidecar_state(service_id: str) -> tuple[ServiceState, str]:
235+
def _make_pending() -> tuple[ServiceState, str]:
280236
pending_task_state = {"State": ServiceState.PENDING.value}
281237
return extract_task_state(task_status=pending_task_state)
282238

@@ -386,38 +342,11 @@ async def remove_dynamic_sidecar_network(network_name: str) -> bool:
386342
return False
387343

388344

389-
async def remove_dynamic_sidecar_volumes(
390-
node_uuid: NodeID, dynamic_sidecar_settings: DynamicSidecarSettings
391-
) -> Set[str]:
392-
async with docker_client() as client:
393-
volumes_response = await client.volumes.list(
394-
filters={
395-
"label": [
396-
f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}",
397-
f"uuid={node_uuid}",
398-
]
399-
}
400-
)
401-
volumes = volumes_response["Volumes"]
402-
log.debug("Removing volumes: %s", [v["Name"] for v in volumes])
403-
if len(volumes) == 0:
404-
log.warning("Expected to find at least 1 volume to remove, 0 were found")
405-
406-
removed_volumes: Set[str] = set()
407-
408-
for volume_data in volumes:
409-
volume = await client.volumes.get(volume_data["Name"])
410-
await volume.delete()
411-
removed_volumes.add(volume_data["Name"])
412-
413-
return removed_volumes
414-
415-
416345
async def list_dynamic_sidecar_services(
417346
dynamic_sidecar_settings: DynamicSidecarSettings,
418347
user_id: Optional[UserID] = None,
419348
project_id: Optional[ProjectID] = None,
420-
) -> List[Dict[str, Any]]:
349+
) -> list[dict[str, Any]]:
421350
service_filters = {
422351
"label": [
423352
f"swarm_stack_name={dynamic_sidecar_settings.SWARM_STACK_NAME}",
@@ -451,8 +380,8 @@ async def is_dynamic_service_running(
451380

452381

453382
async def get_or_create_networks_ids(
454-
networks: List[str], project_id: ProjectID
455-
) -> Dict[str, str]:
383+
networks: list[str], project_id: ProjectID
384+
) -> dict[str, str]:
456385
async def _get_id_from_name(client, network_name: str) -> str:
457386
network = await client.networks.get(network_name)
458387
network_inspect = await network.show()
@@ -496,7 +425,7 @@ async def _get_id_from_name(client, network_name: str) -> str:
496425

497426
async def get_projects_networks_containers(
498427
project_id: ProjectID,
499-
) -> Dict[str, int]:
428+
) -> dict[str, int]:
500429
"""
501430
Returns all current projects_networks for the project with
502431
the amount of containers attached to them.
@@ -511,8 +440,8 @@ async def get_projects_networks_containers(
511440
if not filtered_networks:
512441
return {}
513442

514-
def _count_containers(item: Dict[str, Any]) -> int:
515-
containers: Optional[List] = item.get("Containers")
443+
def _count_containers(item: dict[str, Any]) -> int:
444+
containers: Optional[list] = item.get("Containers")
516445
return 0 if containers is None else len(containers)
517446

518447
return {x["Name"]: _count_containers(x) for x in filtered_networks}

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def _get_environment_variables(
4949
"DY_SIDECAR_USER_ID": f"{scheduler_data.user_id}",
5050
"DY_SIDECAR_PROJECT_ID": f"{scheduler_data.project_id}",
5151
"DY_SIDECAR_NODE_ID": f"{scheduler_data.node_uuid}",
52+
"DY_SIDECAR_RUN_ID": f"{scheduler_data.dynamic_sidecar.run_id}",
5253
"POSTGRES_HOST": f"{app_settings.POSTGRES.POSTGRES_HOST}",
5354
"POSTGRES_ENDPOINT": f"{app_settings.POSTGRES.POSTGRES_HOST}:{app_settings.POSTGRES.POSTGRES_PORT}",
5455
"POSTGRES_PASSWORD": f"{app_settings.POSTGRES.POSTGRES_PASSWORD.get_secret_value()}",
@@ -121,6 +122,7 @@ def get_dynamic_sidecar_spec(
121122
compose_namespace=compose_namespace,
122123
path=path_to_mount,
123124
node_uuid=scheduler_data.node_uuid,
125+
run_id=scheduler_data.dynamic_sidecar.run_id,
124126
)
125127
)
126128
# state paths now get mounted via different driver and are synced to s3 automatically
@@ -134,6 +136,7 @@ def get_dynamic_sidecar_spec(
134136
path=path_to_mount,
135137
project_id=scheduler_data.project_id,
136138
node_uuid=scheduler_data.node_uuid,
139+
run_id=scheduler_data.dynamic_sidecar.run_id,
137140
r_clone_settings=dynamic_sidecar_settings.DYNAMIC_SIDECAR_R_CLONE_SETTINGS,
138141
)
139142
)
@@ -144,6 +147,7 @@ def get_dynamic_sidecar_spec(
144147
compose_namespace=compose_namespace,
145148
path=path_to_mount,
146149
node_uuid=scheduler_data.node_uuid,
150+
run_id=scheduler_data.dynamic_sidecar.run_id,
147151
)
148152
)
149153

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
22
import logging
3-
from typing import Any, Coroutine, Dict, Final, List, Optional, Set, Type, cast
3+
from typing import Any, Coroutine, Final, Optional, Type, cast
44

55
import httpx
66
from fastapi import FastAPI
@@ -20,15 +20,14 @@
2020
from tenacity._asyncio import AsyncRetrying
2121
from tenacity.before_sleep import before_sleep_log
2222
from tenacity.stop import stop_after_delay
23-
from tenacity.wait import wait_exponential, wait_fixed
23+
from tenacity.wait import wait_fixed
2424

2525
from ....core.settings import AppSettings, DynamicSidecarSettings
2626
from ....models.schemas.dynamic_services import DynamicSidecarStatus, SchedulerData
2727
from ....modules.director_v0 import DirectorV0Client
2828
from ...catalog import CatalogClient
2929
from ...db.repositories.projects import ProjectsRepository
3030
from ...db.repositories.projects_networks import ProjectsNetworksRepository
31-
from .._namepsace import get_compose_namespace
3231
from ..client_api import DynamicSidecarClient, get_dynamic_sidecar_client
3332
from ..docker_api import (
3433
are_all_services_present,
@@ -41,7 +40,6 @@
4140
is_dynamic_sidecar_missing,
4241
remove_dynamic_sidecar_network,
4342
remove_dynamic_sidecar_stack,
44-
remove_dynamic_sidecar_volumes,
4543
try_to_remove_network,
4644
)
4745
from ..docker_compose_specs import assemble_spec
@@ -54,10 +52,8 @@
5452
from ..errors import (
5553
DynamicSidecarUnexpectedResponseStatus,
5654
EntrypointContainerNotFoundError,
57-
GenericDockerError,
5855
NodeportsDidNotFindNodeError,
5956
)
60-
from ..volumes_resolver import DynamicSidecarVolumesPathsResolver
6157
from .abc import DynamicSchedulerEvent
6258
from .events_utils import (
6359
all_containers_running,
@@ -145,7 +141,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
145141
dynamic_sidecar_network_id = await create_network(network_config)
146142

147143
# attach the service to the swarm network dedicated to services
148-
swarm_network: Dict[str, Any] = await get_swarm_network(
144+
swarm_network: dict[str, Any] = await get_swarm_network(
149145
dynamic_sidecar_settings
150146
)
151147
swarm_network_id: str = swarm_network["Id"]
@@ -231,7 +227,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
231227
dynamic_sidecar_endpoint = scheduler_data.dynamic_sidecar.endpoint
232228

233229
try:
234-
containers_inspect: Dict[
230+
containers_inspect: dict[
235231
str, Any
236232
] = await dynamic_sidecar_client.containers_inspect(
237233
dynamic_sidecar_endpoint
@@ -586,49 +582,9 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
586582
scheduler_data.dynamic_sidecar_network_name
587583
)
588584

589-
# remove created inputs and outputs volumes
590-
591-
# compute which volumes we expected to be removed
592-
# in case the expected volumes differ from the removed ones
593-
# show an error
594-
compose_namespace = get_compose_namespace(scheduler_data.node_uuid)
595-
expected_volumes_to_remove: Set[str] = {
596-
DynamicSidecarVolumesPathsResolver.source(
597-
compose_namespace=compose_namespace, path=path
598-
)
599-
for path in [
600-
scheduler_data.paths_mapping.inputs_path,
601-
scheduler_data.paths_mapping.outputs_path,
602-
]
603-
+ scheduler_data.paths_mapping.state_paths
604-
}
605-
606-
async for attempt in AsyncRetrying(
607-
wait=wait_exponential(min=1),
608-
stop=stop_after_delay(
609-
dynamic_sidecar_settings.DYNAMIC_SIDECAR_VOLUMES_REMOVAL_TIMEOUT_S
610-
),
611-
retry_error_cls=GenericDockerError,
612-
):
613-
with attempt:
614-
logger.info(
615-
"Trying to remove volumes for %s", scheduler_data.service_name
616-
)
617-
618-
removed_volumes = await remove_dynamic_sidecar_volumes(
619-
scheduler_data.node_uuid, dynamic_sidecar_settings
620-
)
621-
622-
if expected_volumes_to_remove != removed_volumes:
623-
logger.warning(
624-
(
625-
"Attention expected to remove %s, instead only removed %s. "
626-
"Please check with check that all expected to remove volumes "
627-
"are now gone."
628-
),
629-
expected_volumes_to_remove,
630-
removed_volumes,
631-
)
585+
# NOTE: for future attempts, volumes cannot be cleaned up
586+
# since they are local to the node.
587+
# That's why anonymous volumes are used!
632588

633589
logger.debug(
634590
"Removed dynamic-sidecar created services for '%s'",
@@ -657,7 +613,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
657613

658614
# register all handlers defined in this module here
659615
# A list is essential to guarantee execution order
660-
REGISTERED_EVENTS: List[Type[DynamicSchedulerEvent]] = [
616+
REGISTERED_EVENTS: list[Type[DynamicSchedulerEvent]] = [
661617
CreateSidecars,
662618
GetStatus,
663619
PrepareServicesEnvironment,

0 commit comments

Comments
 (0)