Skip to content

Commit 38b6073

Browse files
authored
🐛 Is3240/fix pull inputs sidecar (ITISFoundation#3280)
1 parent 4626b69 commit 38b6073

File tree

11 files changed

+91
-59
lines changed

11 files changed

+91
-59
lines changed

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,8 +583,6 @@ async def _remove_containers_save_state_and_outputs() -> None:
583583
scheduler_data.dynamic_sidecar.service_removal_state.can_save
584584
and scheduler_data.dynamic_sidecar.were_containers_created
585585
):
586-
dynamic_sidecar_client = get_dynamic_sidecar_client(app)
587-
588586
logger.info(
589587
"Calling into dynamic-sidecar to save state and pushing data to nodeports"
590588
)
Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1-
# module acting as root for all routes
1+
""" Module to collect, tag and prefix all routes under 'main_router'
2+
3+
Setup and register all routes here form different modules
4+
"""
25

36
from fastapi import APIRouter
47

58
from .._meta import API_VTAG
6-
from .containers import containers_router
7-
from .containers_extension import containers_router_extension
8-
from .containers_tasks import containers_router_tasks
9-
from .health import health_router
9+
from . import containers, containers_extension, containers_long_running_tasks, health
1010

11-
# setup and register all routes here form different modules
1211
main_router = APIRouter()
13-
main_router.include_router(health_router)
14-
main_router.include_router(containers_router, prefix=f"/{API_VTAG}")
15-
main_router.include_router(containers_router_extension, prefix=f"/{API_VTAG}")
16-
main_router.include_router(containers_router_tasks, prefix=f"/{API_VTAG}")
12+
main_router.include_router(health.router)
13+
main_router.include_router(
14+
containers.router,
15+
tags=["containers"],
16+
prefix=f"/{API_VTAG}",
17+
)
18+
main_router.include_router(
19+
containers_extension.router,
20+
tags=["containers"],
21+
prefix=f"/{API_VTAG}",
22+
)
23+
main_router.include_router(
24+
containers_long_running_tasks.router,
25+
tags=["containers"],
26+
prefix=f"/{API_VTAG}",
27+
)
1728

1829
__all__: tuple[str, ...] = ("main_router",)

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ def _raise_if_container_is_missing(
2626
raise HTTPException(status.HTTP_404_NOT_FOUND, detail=message)
2727

2828

29-
containers_router = APIRouter(tags=["containers"])
29+
router = APIRouter()
3030

3131

32-
@containers_router.get(
32+
@router.get(
3333
"/containers",
3434
responses={
3535
status.HTTP_500_INTERNAL_SERVER_ERROR: {"description": "Errors in container"}
@@ -74,7 +74,20 @@ def _format_result(container_inspect: dict[str, Any]) -> dict[str, Any]:
7474
return results
7575

7676

77-
@containers_router.get(
77+
# Some of the operations and sub-resources on containers are implemented as long-running tasks.
78+
# Handlers for these operations can be found in:
79+
#
80+
# POST /containers : SEE containers_long_running_tasks::create_service_containers_task
81+
# POST /containers:down : SEE containers_long_running_tasks::runs_docker_compose_down_task
82+
# POST /containers/state:restore : SEE containers_long_running_tasks::state_restore_task
83+
# POST /containers/state:save : SEE containers_long_running_tasks::state_save_task
84+
# POST /containers/ports/inputs:pull : SEE containers_long_running_tasks::ports_inputs_pull_task
85+
# POST /containers/ports/outputs:pull : SEE containers_long_running_tasks::ports_outputs_pull_task
86+
# POST /containers/ports/outputs:push : SEE containers_long_running_tasks::ports_outputs_push_task
87+
#
88+
89+
90+
@router.get(
7891
"/containers/{id}/logs",
7992
responses={
8093
status.HTTP_404_NOT_FOUND: {
@@ -120,7 +133,7 @@ async def get_container_logs(
120133
return container_logs
121134

122135

123-
@containers_router.get(
136+
@router.get(
124137
"/containers/name",
125138
responses={
126139
status.HTTP_404_NOT_FOUND: {
@@ -186,7 +199,7 @@ async def get_containers_name(
186199
return f"{container_name}"
187200

188201

189-
@containers_router.get(
202+
@router.get(
190203
"/containers/{id}",
191204
responses={
192205
status.HTTP_404_NOT_FOUND: {"description": "Container does not exist"},

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,10 @@ async def send_message(rabbitmq: RabbitMQ, message: str) -> None:
4545
#
4646
# HANDLERS ------------------
4747
#
48-
# - ANE: importing the `containers_router` router from .containers
49-
# and generating the openapi spec, will not add the below entrypoints
50-
# we need to create a new one in order for all the APIs to be
51-
# detected as before
52-
#
53-
containers_router_extension = APIRouter(tags=["containers"])
48+
router = APIRouter()
5449

5550

56-
@containers_router_extension.patch(
51+
@router.patch(
5752
"/containers/directory-watcher",
5853
summary="Enable/disable directory-watcher event propagation",
5954
response_class=Response,
@@ -69,7 +64,7 @@ async def toggle_directory_watcher(
6964
directory_watcher.disable_directory_watcher(app)
7065

7166

72-
@containers_router_extension.post(
67+
@router.post(
7368
"/containers/ports/outputs/dirs",
7469
summary=(
7570
"Creates the output directories declared by the docker images's labels. "
@@ -91,7 +86,7 @@ async def create_output_dirs(
9186
dir_to_create.mkdir(parents=True, exist_ok=True)
9287

9388

94-
@containers_router_extension.post(
89+
@router.post(
9590
"/containers/{id}/networks:attach",
9691
summary="attach container to a network, if not already attached",
9792
response_class=Response,
@@ -132,7 +127,7 @@ async def attach_container_to_network(
132127
)
133128

134129

135-
@containers_router_extension.post(
130+
@router.post(
136131
"/containers/{id}/networks:detach",
137132
summary="detach container from a network, if not already detached",
138133
response_class=Response,

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_tasks.py renamed to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,10 @@
3838
)
3939

4040
logger = logging.getLogger(__name__)
41+
router = APIRouter()
4142

42-
containers_router_tasks = APIRouter(tags=["containers"])
4343

44-
45-
# HANDLERS
46-
47-
48-
@containers_router_tasks.post(
44+
@router.post(
4945
"/containers",
5046
summary=dedent(
5147
"""
@@ -98,7 +94,7 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments
9894
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
9995

10096

101-
@containers_router_tasks.post(
97+
@router.post(
10298
"/containers:down",
10399
summary="Remove the previously started containers",
104100
status_code=status.HTTP_202_ACCEPTED,
@@ -133,7 +129,7 @@ async def runs_docker_compose_down_task(
133129
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
134130

135131

136-
@containers_router_tasks.post(
132+
@router.post(
137133
"/containers/state:restore",
138134
summary="Restores the state of the dynamic service",
139135
status_code=status.HTTP_202_ACCEPTED,
@@ -168,7 +164,7 @@ async def state_restore_task(
168164
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
169165

170166

171-
@containers_router_tasks.post(
167+
@router.post(
172168
"/containers/state:save",
173169
summary="Stores the state of the dynamic service",
174170
status_code=status.HTTP_202_ACCEPTED,
@@ -203,7 +199,7 @@ async def state_save_task(
203199
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
204200

205201

206-
@containers_router_tasks.post(
202+
@router.post(
207203
"/containers/ports/inputs:pull",
208204
summary="Pull input ports data",
209205
status_code=status.HTTP_202_ACCEPTED,
@@ -238,7 +234,7 @@ async def ports_inputs_pull_task(
238234
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
239235

240236

241-
@containers_router_tasks.post(
237+
@router.post(
242238
"/containers/ports/outputs:pull",
243239
summary="Pull output ports data",
244240
status_code=status.HTTP_202_ACCEPTED,
@@ -273,7 +269,7 @@ async def ports_outputs_pull_task(
273269
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
274270

275271

276-
@containers_router_tasks.post(
272+
@router.post(
277273
"/containers/ports/outputs:push",
278274
summary="Push output ports data",
279275
status_code=status.HTTP_202_ACCEPTED,
@@ -308,7 +304,7 @@ async def ports_outputs_push_task(
308304
raise HTTPException(status.HTTP_409_CONFLICT, detail=f"{e}") from e
309305

310306

311-
@containers_router_tasks.post(
307+
@router.post(
312308
"/containers:restart",
313309
summary="Restarts previously started containers",
314310
status_code=status.HTTP_202_ACCEPTED,

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/health.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
from ..models.schemas.application_health import ApplicationHealth
44
from ._dependencies import get_application_health
55

6-
health_router = APIRouter()
6+
router = APIRouter()
77

88

9-
@health_router.get(
9+
@router.get(
1010
"/health",
1111
response_model=ApplicationHealth,
1212
responses={
@@ -22,6 +22,3 @@ async def health_endpoint(
2222
)
2323

2424
return application_health
25-
26-
27-
__all__: tuple[str, ...] = ("health_router",)

services/dynamic-sidecar/tests/unit/test_api_containers_tasks.py renamed to services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
3838
from simcore_sdk.node_ports_common.exceptions import NodeNotFound
3939
from simcore_service_dynamic_sidecar._meta import API_VTAG
40-
from simcore_service_dynamic_sidecar.api import containers_tasks
40+
from simcore_service_dynamic_sidecar.api import containers_long_running_tasks
4141
from simcore_service_dynamic_sidecar.models.shared_store import SharedStore
4242

4343
FAST_STATUS_POLL: Final[float] = 0.1
@@ -71,12 +71,14 @@ async def _just_log_task(*args, **kwargs) -> None:
7171
# searching by name since all start with _task
7272
tasks_names = [
7373
x[0]
74-
for x in getmembers(containers_tasks, isfunction)
74+
for x in getmembers(containers_long_running_tasks, isfunction)
7575
if x[0].startswith("task")
7676
]
7777

7878
for task_name in tasks_names:
79-
mocker.patch.object(containers_tasks, task_name, new=_just_log_task)
79+
mocker.patch.object(
80+
containers_long_running_tasks, task_name, new=_just_log_task
81+
)
8082

8183
yield None
8284

services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
from contextlib import suppress
99
from pprint import pformat
10-
from typing import Dict, List, Optional
10+
from typing import Optional
1111

1212
from aiohttp import web
1313
from aiopg.sa import Engine
@@ -44,7 +44,7 @@ async def _update_project_state(
4444
project_uuid: str,
4545
node_uuid: str,
4646
new_state: RunningState,
47-
node_errors: Optional[List[ErrorDict]],
47+
node_errors: Optional[list[ErrorDict]],
4848
) -> None:
4949
project = await projects_api.update_project_node_state(
5050
app, user_id, project_uuid, node_uuid, new_state
@@ -73,7 +73,7 @@ async def listen(app: web.Application, db_engine: Engine):
7373
"received update from database: %s", pformat(notification.payload)
7474
)
7575
# get the data and the info on what changed
76-
payload: Dict = json.loads(notification.payload)
76+
payload: dict = json.loads(notification.payload)
7777

7878
# FIXME: all this should move to rabbitMQ instead of this
7979
task_data = payload.get("data", {})
@@ -100,6 +100,7 @@ async def listen(app: web.Application, db_engine: Engine):
100100
if any(f in task_changes for f in ["outputs", "run_hash"]):
101101
new_outputs = task_data.get("outputs", {})
102102
new_run_hash = task_data.get("run_hash", None)
103+
103104
await update_node_outputs(
104105
app,
105106
the_project_owner,

services/web/server/src/simcore_service_webserver/projects/projects_api.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@
5858
from ..users_api import UserRole, get_user_name, get_user_role
5959
from ..users_exceptions import UserNotFoundError
6060
from . import _delete
61-
from .project_lock import UserNameDict, get_project_locked_state, lock_project
61+
from .project_lock import (
62+
UserNameDict,
63+
get_project_locked_state,
64+
is_project_locked,
65+
lock_project,
66+
)
6267
from .projects_db import APP_PROJECT_DBAPI, ProjectDBAPI
6368
from .projects_exceptions import NodeNotFoundError, ProjectLockError
6469
from .projects_utils import extract_dns_without_default_port
@@ -387,11 +392,23 @@ async def is_node_id_present_in_any_project_workbench(
387392
return await db.node_id_exists(node_id)
388393

389394

390-
async def trigger_connected_service_retrieve(
395+
async def _trigger_connected_service_retrieve(
391396
app: web.Application, project: dict, updated_node_uuid: str, changed_keys: list[str]
392397
) -> None:
398+
project_id = project["uuid"]
399+
if await is_project_locked(app, project_id):
400+
# NOTE: we log warn since this function is fire&forget and raise an exception would not be anybody to handle it
401+
log.warning(
402+
"Skipping service retrieval because project with %s is currently locked."
403+
"Operation triggered by %s",
404+
f"{project_id=}",
405+
f"{changed_keys=}",
406+
)
407+
return
408+
393409
workbench = project["workbench"]
394410
nodes_keys_to_update: dict[str, list[str]] = defaultdict(list)
411+
395412
# find the nodes that need to retrieve data
396413
for node_uuid, node in workbench.items():
397414
# check this node is dynamic
@@ -425,7 +442,7 @@ async def post_trigger_connected_service_retrieve(
425442
app: web.Application, **kwargs
426443
) -> None:
427444
await fire_and_forget_task(
428-
trigger_connected_service_retrieve(app, **kwargs),
445+
_trigger_connected_service_retrieve(app, **kwargs),
429446
task_suffix_name="trigger_connected_service_retrieve",
430447
fire_and_forget_tasks_collection=app[APP_FIRE_AND_FORGET_TASKS_KEY],
431448
)

services/web/server/src/simcore_service_webserver/projects/projects_nodes_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async def update_node_outputs(
8484
else list(ui_changed_keys | set(keys_changed))
8585
)
8686

87-
# notify
87+
# fire&forget to notify connected nodes to retrieve its inputs **if necessary**
8888
await projects_api.post_trigger_connected_service_retrieve(
8989
app=app, project=project, updated_node_uuid=node_uuid, changed_keys=keys
9090
)

0 commit comments

Comments
 (0)