Skip to content

Commit 46c78d3

Browse files
authored
🐛 Fix file-picker downstream service notification issues (ITISFoundation#3058)
- file-picker propagated updates to downstream services due to a side effect (FIXED) - file-picker properly sends notifications when the file source changes - dynamic-sidecar properly purges directory when it downloads new data - dynamic-sidecar inputs downloads are now queued and can no longer be happening in parallel
1 parent 4bd0ac7 commit 46c78d3

File tree

15 files changed

+762
-340
lines changed

15 files changed

+762
-340
lines changed

scripts/pydeps.bash

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ Run()
3939
# Examples:
4040
# - SEE https://pydeps.readthedocs.io/en/latest/#usage
4141
#
42-
# pydeps.bash services/web/server/src/simcore_service_webserver --cluster
43-
# pydeps.bash services/web/server/src/simcore_service_webserver --only "simcore_service_webserver.projects" --cluster
42+
# ./scripts/pydeps.bash services/web/server/src/simcore_service_webserver --cluster
43+
# ./scripts/pydeps.bash services/web/server/src/simcore_service_webserver --only "simcore_service_webserver.projects" --cluster
4444
#
4545
#
4646

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_extension.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def pull_input_ports(
129129
PortTypeName.INPUTS, mounted_volumes.disk_inputs_path, port_keys=port_keys
130130
)
131131
await send_message(rabbitmq, "Finished pulling inputs")
132-
return transferred_bytes
132+
return int(transferred_bytes)
133133

134134

135135
@containers_router.patch(
@@ -187,7 +187,7 @@ async def pull_output_ports(
187187
PortTypeName.OUTPUTS, mounted_volumes.disk_outputs_path, port_keys=port_keys
188188
)
189189
await send_message(rabbitmq, "Finished pulling output")
190-
return transferred_bytes
190+
return int(transferred_bytes)
191191

192192

193193
@containers_router.post(

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py

Lines changed: 85 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import time
88
from collections import deque
99
from pathlib import Path
10-
from typing import Coroutine, Deque, Dict, List, Optional, Set, Tuple, cast
10+
from typing import Any, Coroutine, Optional, cast
1111

1212
import magic
13+
from models_library.projects_nodes import OutputsDict
1314
from pydantic import ByteSize
1415
from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir
1516
from servicelib.async_utils import run_sequentially_in_context
@@ -49,7 +50,7 @@ def _get_size_of_value(value: ItemConcreteValue) -> int:
4950

5051

5152
@run_sequentially_in_context()
52-
async def upload_outputs(outputs_path: Path, port_keys: List[str]) -> None:
53+
async def upload_outputs(outputs_path: Path, port_keys: list[str]) -> None:
5354
"""calls to this function will get queued and invoked in sequence"""
5455
# pylint: disable=too-many-branches
5556
logger.info("uploading data to simcore...")
@@ -64,9 +65,9 @@ async def upload_outputs(outputs_path: Path, port_keys: List[str]) -> None:
6465
)
6566

6667
# let's gather the tasks
67-
temp_paths: Deque[Path] = deque()
68-
ports_values: Dict[str, ItemConcreteValue] = {}
69-
archiving_tasks: Deque[Coroutine[None, None, None]] = deque()
68+
temp_paths: deque[Path] = deque()
69+
ports_values: dict[str, ItemConcreteValue] = {}
70+
archiving_tasks: deque[Coroutine[None, None, None]] = deque()
7071

7172
for port in (await PORTS.outputs).values():
7273
logger.info("Checking port %s", port.key)
@@ -151,7 +152,7 @@ def _is_zip_file(file_path: Path) -> bool:
151152
return f"{mime_type}" == "application/zip"
152153

153154

154-
async def _get_data_from_port(port: Port) -> Tuple[Port, ItemConcreteValue]:
155+
async def _get_data_from_port(port: Port) -> tuple[Port, Optional[ItemConcreteValue]]:
155156
tag = f"[{port.key}] "
156157
logger.info("%s transfer started for %s", tag, port.key)
157158
start_time = time.perf_counter()
@@ -167,11 +168,83 @@ async def _get_data_from_port(port: Port) -> Tuple[Port, ItemConcreteValue]:
167168
size_mb,
168169
size_mb / elapsed_time,
169170
)
170-
return (port, ret)
171+
return port, ret
171172

172173

174+
async def _download_files(
175+
target_path: Path, download_tasks: deque[Coroutine[Any, int, Any]]
176+
) -> tuple[OutputsDict, ByteSize]:
177+
transferred_bytes = 0
178+
data: OutputsDict = {}
179+
180+
if not download_tasks:
181+
return data, ByteSize(transferred_bytes)
182+
183+
# TODO: limit concurrency to avoid saturating storage+db??
184+
results: list[tuple[Port, ItemConcreteValue]] = cast(
185+
list[tuple[Port, ItemConcreteValue]], await logged_gather(*download_tasks)
186+
)
187+
logger.info("completed download %s", results)
188+
for port, value in results:
189+
190+
data[port.key] = {"key": port.key, "value": value}
191+
192+
if _FILE_TYPE_PREFIX in port.property_type:
193+
194+
# if there are files, move them to the final destination
195+
downloaded_file: Optional[Path] = cast(Optional[Path], value)
196+
dest_path: Path = target_path / port.key
197+
198+
if not downloaded_file or not downloaded_file.exists():
199+
# the link may be empty
200+
# remove files all files from disk when disconnecting port
201+
logger.info("removing contents of dir %s", dest_path)
202+
await remove_directory(
203+
dest_path, only_children=True, ignore_errors=True
204+
)
205+
continue
206+
207+
transferred_bytes = transferred_bytes + downloaded_file.stat().st_size
208+
209+
# in case of valid file, it is either uncompressed and/or moved to the final directory
210+
logger.info("creating directory %s", dest_path)
211+
dest_path.mkdir(exist_ok=True, parents=True)
212+
data[port.key] = {"key": port.key, "value": str(dest_path)}
213+
214+
dest_folder = PrunableFolder(dest_path)
215+
216+
if _is_zip_file(downloaded_file):
217+
# unzip updated data to dest_path
218+
logger.info("unzipping %s", downloaded_file)
219+
unarchived: set[Path] = await unarchive_dir(
220+
archive_to_extract=downloaded_file, destination_folder=dest_path
221+
)
222+
223+
dest_folder.prune(exclude=unarchived)
224+
225+
logger.info("all unzipped in %s", dest_path)
226+
else:
227+
logger.info("moving %s", downloaded_file)
228+
dest_path = dest_path / Path(downloaded_file).name
229+
await async_on_threadpool(
230+
# pylint: disable=cell-var-from-loop
231+
lambda: shutil.move(str(downloaded_file), dest_path)
232+
)
233+
234+
# NOTE: after the download the current value of the port
235+
# makes sure previously downloaded files are removed
236+
dest_folder.prune(exclude={dest_path})
237+
238+
logger.info("all moved to %s", dest_path)
239+
else:
240+
transferred_bytes = transferred_bytes + sys.getsizeof(value)
241+
242+
return data, ByteSize(transferred_bytes)
243+
244+
245+
@run_sequentially_in_context()
173246
async def download_target_ports(
174-
port_type_name: PortTypeName, target_path: Path, port_keys: List[str]
247+
port_type_name: PortTypeName, target_path: Path, port_keys: list[str]
175248
) -> ByteSize:
176249
logger.info("retrieving data from simcore...")
177250
start_time = time.perf_counter()
@@ -183,10 +256,9 @@ async def download_target_ports(
183256
node_uuid=str(settings.DY_SIDECAR_NODE_ID),
184257
r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS,
185258
)
186-
data = {}
187259

188260
# let's gather all the data
189-
download_tasks = []
261+
download_tasks: deque[Coroutine[Any, int, Any]] = deque()
190262
for port_value in (await getattr(PORTS, port_type_name.value)).values():
191263
# if port_keys contains some keys only download them
192264
logger.info("Checking node %s", port_value.key)
@@ -196,61 +268,7 @@ async def download_target_ports(
196268
download_tasks.append(_get_data_from_port(port_value))
197269
logger.info("retrieving %s data", len(download_tasks))
198270

199-
transfer_bytes = 0
200-
if download_tasks:
201-
# TODO: limit concurrency to avoid saturating storage+db??
202-
results: List[Tuple[Port, ItemConcreteValue]] = cast(
203-
List[Tuple[Port, ItemConcreteValue]], await logged_gather(*download_tasks)
204-
)
205-
logger.info("completed download %s", results)
206-
for port, value in results:
207-
208-
data[port.key] = {"key": port.key, "value": value}
209-
210-
if _FILE_TYPE_PREFIX in port.property_type:
211-
212-
# if there are files, move them to the final destination
213-
downloaded_file: Optional[Path] = cast(Optional[Path], value)
214-
dest_path: Path = target_path / port.key
215-
216-
if not downloaded_file or not downloaded_file.exists():
217-
# the link may be empty
218-
# remove files all files from disk when disconnecting port
219-
await remove_directory(
220-
dest_path, only_children=True, ignore_errors=True
221-
)
222-
continue
223-
224-
transfer_bytes = transfer_bytes + downloaded_file.stat().st_size
225-
226-
# in case of valid file, it is either uncompressed and/or moved to the final directory
227-
logger.info("creating directory %s", dest_path)
228-
dest_path.mkdir(exist_ok=True, parents=True)
229-
data[port.key] = {"key": port.key, "value": str(dest_path)}
230-
231-
if _is_zip_file(downloaded_file):
232-
233-
dest_folder = PrunableFolder(dest_path)
234-
235-
# unzip updated data to dest_path
236-
logger.info("unzipping %s", downloaded_file)
237-
unarchived: Set[Path] = await unarchive_dir(
238-
archive_to_extract=downloaded_file, destination_folder=dest_path
239-
)
240-
241-
dest_folder.prune(exclude=unarchived)
242-
243-
logger.info("all unzipped in %s", dest_path)
244-
else:
245-
logger.info("moving %s", downloaded_file)
246-
dest_path = dest_path / Path(downloaded_file).name
247-
await async_on_threadpool(
248-
# pylint: disable=cell-var-from-loop
249-
lambda: shutil.move(str(downloaded_file), dest_path)
250-
)
251-
logger.info("all moved to %s", dest_path)
252-
else:
253-
transfer_bytes = transfer_bytes + sys.getsizeof(value)
271+
data, transferred_bytes = await _download_files(target_path, download_tasks)
254272

255273
# create/update the json file with the new values
256274
if data:
@@ -261,15 +279,13 @@ async def download_target_ports(
261279
data = {**current_data, **data}
262280
data_file.write_text(json.dumps(data))
263281

264-
transferred = ByteSize(transfer_bytes)
265282
elapsed_time = time.perf_counter() - start_time
266283
logger.info(
267284
"Downloaded %s in %s seconds",
268-
transferred.human_readable(decimal=True),
285+
transferred_bytes.human_readable(decimal=True),
269286
elapsed_time,
270287
)
271-
272-
return transferred
288+
return transferred_bytes
273289

274290

275291
__all__ = ["dispatch_update_for_directory", "download_target_ports"]

services/web/server/src/simcore_service_webserver/catalog_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
from typing import Any, Dict, List, Mapping, Optional
88

99
from aiohttp import ClientSession, ClientTimeout, web
10-
from pydantic import parse_obj_as
1110
from aiohttp.client_exceptions import (
1211
ClientConnectionError,
1312
ClientResponseError,
1413
InvalidURL,
1514
)
1615
from models_library.services_resources import ServiceResourcesDict
16+
from pydantic import parse_obj_as
1717
from servicelib.aiohttp.client_session import get_client_session
1818
from servicelib.aiohttp.rest_responses import wrap_as_envelope
1919
from servicelib.json_serialization import json_dumps

services/web/server/src/simcore_service_webserver/computation_comp_tasks_listening_task.py

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
from pydantic.types import PositiveInt
1818
from servicelib.aiohttp.application_keys import APP_DB_ENGINE_KEY
1919
from servicelib.logging_utils import log_decorator
20-
from servicelib.utils import logged_gather
2120
from simcore_postgres_database.webserver_models import DB_CHANNEL_NAME, projects
2221
from sqlalchemy.sql import select
2322

2423
from .computation_utils import convert_state_from_db
2524
from .projects import projects_api, projects_exceptions
26-
from .projects.projects_utils import project_get_depending_nodes
25+
from .projects.projects_nodes_utils import update_node_outputs
2726

2827
log = logging.getLogger(__name__)
2928

@@ -54,43 +53,6 @@ async def _update_project_state(
5453
await projects_api.notify_project_state_update(app, project)
5554

5655

57-
@log_decorator(logger=log)
58-
async def _update_project_outputs(
59-
app: web.Application,
60-
user_id: PositiveInt,
61-
project_uuid: str,
62-
node_uuid: str,
63-
outputs: Dict,
64-
run_hash: Optional[str],
65-
node_errors: Optional[List[ErrorDict]],
66-
) -> None:
67-
# the new outputs might be {}, or {key_name: payload}
68-
project, changed_keys = await projects_api.update_project_node_outputs(
69-
app,
70-
user_id,
71-
project_uuid,
72-
node_uuid,
73-
new_outputs=outputs,
74-
new_run_hash=run_hash,
75-
)
76-
77-
await projects_api.notify_project_node_update(
78-
app, project, f"{node_uuid}", errors=node_errors
79-
)
80-
# get depending node and notify for these ones as well
81-
depending_node_uuids = await project_get_depending_nodes(project, f"{node_uuid}")
82-
await logged_gather(
83-
*[
84-
projects_api.notify_project_node_update(app, project, nid, errors=None)
85-
for nid in depending_node_uuids
86-
]
87-
)
88-
# notify
89-
await projects_api.post_trigger_connected_service_retrieve(
90-
app=app, project=project, updated_node_uuid=node_uuid, changed_keys=changed_keys
91-
)
92-
93-
9456
async def listen(app: web.Application, db_engine: Engine):
9557
listen_query = f"LISTEN {DB_CHANNEL_NAME};"
9658
_LISTENING_TASK_BASE_SLEEPING_TIME_S = 1
@@ -138,14 +100,15 @@ async def listen(app: web.Application, db_engine: Engine):
138100
if any(f in task_changes for f in ["outputs", "run_hash"]):
139101
new_outputs = task_data.get("outputs", {})
140102
new_run_hash = task_data.get("run_hash", None)
141-
await _update_project_outputs(
103+
await update_node_outputs(
142104
app,
143105
the_project_owner,
144106
project_uuid,
145107
node_uuid,
146108
new_outputs,
147109
new_run_hash,
148110
node_errors=task_data.get("errors", None),
111+
ui_changed_keys=None,
149112
)
150113

151114
if "state" in task_changes:

0 commit comments

Comments
 (0)