Skip to content

Commit 9a8076d

Browse files
authored
🐛 workspace.zip disappears while uploading (ITISFoundation#3306)
1 parent a038a9c commit 9a8076d

File tree

4 files changed

+97
-156
lines changed

4 files changed

+97
-156
lines changed

packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from models_library.projects_nodes_io import StorageFileID
88
from pydantic import parse_obj_as
99
from servicelib.archiving_utils import archive_dir, unarchive_dir
10+
from servicelib.logging_utils import log_catch, log_context
1011
from settings_library.r_clone import RCloneSettings
1112
from simcore_sdk.node_ports_common.constants import SIMCORE_LOCATION
1213

@@ -70,8 +71,9 @@ async def push(
7071
io_log_redirect_cb=io_log_redirect_cb,
7172
)
7273
# we have a folder, so we create a compressed file
73-
with TemporaryDirectory() as tmp_dir_name:
74-
log.info("compressing %s into %s...", file_or_folder.name, tmp_dir_name)
74+
with log_catch(log), log_context(
75+
log, logging.INFO, "pushing %s", file_or_folder
76+
), TemporaryDirectory() as tmp_dir_name:
7577
# compress the files
7678
archive_file_path = (
7779
Path(tmp_dir_name) / f"{rename_to or file_or_folder.stem}.zip"

services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py

Lines changed: 30 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from collections import namedtuple
1010
from itertools import tee
1111
from pathlib import Path
12-
from pprint import pformat
1312
from typing import Any, AsyncIterable, Callable, Iterable, Iterator, cast
1413
from uuid import uuid4
1514

@@ -60,8 +59,11 @@
6059
)
6160
from sqlalchemy.dialects.postgresql import insert as pg_insert
6261
from starlette import status
62+
from tenacity._asyncio import AsyncRetrying
63+
from tenacity.retry import retry_if_exception_type
64+
from tenacity.stop import stop_after_delay
65+
from tenacity.wait import wait_fixed
6366
from utils import (
64-
SEPARATOR,
6567
assert_all_services_running,
6668
assert_retrieve_service,
6769
assert_services_reply_200,
@@ -680,7 +682,7 @@ async def _wait_for_dy_services_to_fully_stop(
680682
to_observe = (
681683
director_v2_client._transport.app.state.dynamic_sidecar_scheduler._to_observe
682684
)
683-
685+
# TODO: ANE please use tenacity
684686
for i in range(TIMEOUT_DETECT_DYNAMIC_SERVICES_STOPPED):
685687
print(
686688
f"Sleeping for {i+1}/{TIMEOUT_DETECT_DYNAMIC_SERVICES_STOPPED} "
@@ -727,27 +729,6 @@ def _relative_path(root_path: Path, full_path: Path) -> Path:
727729
}
728730

729731

730-
LINE_PARTS_TO_MATCH = [
731-
(0, "INFO:simcore_service_dynamic_sidecar.modules.nodeports:Uploaded"),
732-
(2, "bytes"),
733-
(3, "in"),
734-
(5, "seconds"),
735-
]
736-
737-
738-
def _is_matching_line_in_logs(logs: list[str]) -> bool:
739-
for line in logs:
740-
if LINE_PARTS_TO_MATCH[0][1] in line:
741-
print("".join(logs))
742-
743-
line_parts = line.strip().split(" ")
744-
for position, value in LINE_PARTS_TO_MATCH:
745-
assert line_parts[position] == value
746-
747-
return True
748-
return False
749-
750-
751732
async def _print_dynamic_sidecars_containers_logs_and_get_containers(
752733
dynamic_services_urls: dict[str, str]
753734
) -> list[str]:
@@ -778,17 +759,9 @@ async def _print_dynamic_sidecars_containers_logs_and_get_containers(
778759
return containers_names
779760

780761

781-
async def _print_container_inspect(container_id: str) -> None:
782-
async with aiodocker.Docker() as docker_client:
783-
container = await docker_client.containers.get(container_id)
784-
container_inspect = await container.show()
785-
print(f"Container {container_id} inspect:\n{pformat(container_inspect)}")
786-
787-
788-
async def _print_all_docker_volumes() -> None:
789-
async with aiodocker.Docker() as docker_client:
790-
docker_volumes = await docker_client.volumes.list()
791-
print(f"Detected volumes:\n{pformat(docker_volumes)}")
762+
_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE = (
763+
"TEST: test_nodeports_integration DO NOT REMOVE"
764+
)
792765

793766

794767
async def _assert_retrieve_completed(
@@ -806,50 +779,24 @@ async def _assert_retrieve_completed(
806779
# look at dynamic-sidecar's logs to be sure when nodeports
807780
# have been uploaded
808781
async with aiodocker.Docker() as docker_client:
809-
container: DockerContainer = await docker_client.containers.get(container_id)
810-
811-
for i in range(TIMEOUT_OUTPUTS_UPLOAD_FINISH_DETECTED):
812-
logs = await container.log(stdout=True, stderr=True)
813-
814-
if _is_matching_line_in_logs(logs):
815-
break
816-
817-
if i == TIMEOUT_OUTPUTS_UPLOAD_FINISH_DETECTED - 1:
818-
print(SEPARATOR)
819-
print(f"Dumping information for service_uuid={service_uuid}")
820-
print(SEPARATOR)
821-
822-
print("".join(logs))
823-
print(SEPARATOR)
824-
825-
containers_names = (
826-
await _print_dynamic_sidecars_containers_logs_and_get_containers(
827-
dynamic_services_urls
828-
)
782+
async for attempt in AsyncRetrying(
783+
reraise=True,
784+
retry=retry_if_exception_type(AssertionError),
785+
stop=stop_after_delay(TIMEOUT_OUTPUTS_UPLOAD_FINISH_DETECTED),
786+
wait=wait_fixed(0.5),
787+
):
788+
with attempt:
789+
print(
790+
f"--> checking container logs of {service_uuid=}, [attempt {attempt.retry_state.attempt_number}]..."
791+
)
792+
container: DockerContainer = await docker_client.containers.get(
793+
container_id
829794
)
830-
print(SEPARATOR)
831-
832-
# inspect dynamic-sidecar container
833-
await _print_container_inspect(container_id=container_id)
834-
print(SEPARATOR)
835-
836-
# inspect spawned container
837-
for container_name in containers_names:
838-
await _print_container_inspect(container_id=container_name)
839-
print(SEPARATOR)
840-
841-
await _print_all_docker_volumes()
842-
print(SEPARATOR)
843-
844-
assert False, "Timeout reached"
845-
846-
print(
847-
f"Sleeping {i+1}/{TIMEOUT_OUTPUTS_UPLOAD_FINISH_DETECTED} "
848-
f"before searching logs from {service_uuid} again"
849-
)
850-
await asyncio.sleep(1)
851795

852-
print(f"Nodeports outputs upload finish detected for {service_uuid}")
796+
logs = " ".join(await container.log(stdout=True, stderr=True))
797+
assert (
798+
_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE in logs
799+
), "TIP: Message missing suggests that the data was never uploaded: look in services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py"
853800

854801

855802
# TESTS
@@ -966,21 +913,12 @@ async def test_nodeports_integration(
966913
dynamic_services_urls
967914
)
968915

969-
await _assert_retrieve_completed(
970-
director_v2_client=async_client,
971-
service_uuid=services_node_uuids.dy,
972-
dynamic_services_urls=dynamic_services_urls,
973-
)
974-
975-
# NOTE: Waits a bit for the DB to write the changes in
976-
# comp_task for the upstream service.
977-
await asyncio.sleep(2)
978-
979-
await _assert_retrieve_completed(
980-
director_v2_client=async_client,
981-
service_uuid=services_node_uuids.dy_compose_spec,
982-
dynamic_services_urls=dynamic_services_urls,
983-
)
916+
for service_uuid in (services_node_uuids.dy, services_node_uuids.dy_compose_spec):
917+
await _assert_retrieve_completed(
918+
director_v2_client=async_client,
919+
service_uuid=service_uuid,
920+
dynamic_services_urls=dynamic_services_urls,
921+
)
984922

985923
# STEP 3
986924
# pull data via nodeports

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
get_common_oas_options,
1010
override_fastapi_openapi_method,
1111
)
12+
from servicelib.logging_utils import config_all_loggers
1213
from simcore_sdk.node_ports_common.exceptions import NodeNotFound
1314

1415
from .._meta import API_VERSION, API_VTAG, PROJECT_NAME, SUMMARY, __version__
@@ -99,6 +100,7 @@ def setup_logger(settings: ApplicationSettings):
99100
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/3148
100101
logging.basicConfig(level=settings.log_level)
101102
logging.root.setLevel(settings.log_level)
103+
config_all_loggers()
102104

103105

104106
def create_base_app() -> FastAPI:
@@ -122,6 +124,7 @@ def create_base_app() -> FastAPI:
122124
long_running_tasks.server.setup(app)
123125

124126
app.include_router(main_router)
127+
125128
return app
126129

127130

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import os
44
import shutil
55
import sys
6-
import tempfile
76
import time
87
from collections import deque
8+
from contextlib import AsyncExitStack
99
from enum import Enum
1010
from pathlib import Path
1111
from typing import Any, Coroutine, Optional, cast
1212

1313
import magic
14+
from aiofiles.tempfile import TemporaryDirectory as AioTemporaryDirectory
1415
from models_library.projects import ProjectIDStr
1516
from models_library.projects_nodes import OutputsDict
1617
from models_library.projects_nodes_io import NodeIDStr
@@ -57,6 +58,11 @@ def _get_size_of_value(value: ItemConcreteValue) -> int:
5758
return sys.getsizeof(value)
5859

5960

61+
_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE = (
62+
"TEST: test_nodeports_integration DO NOT REMOVE"
63+
)
64+
65+
6066
@run_sequentially_in_context()
6167
async def upload_outputs(
6268
outputs_path: Path,
@@ -78,77 +84,69 @@ async def upload_outputs(
7884
)
7985

8086
# let's gather the tasks
81-
temp_paths: deque[Path] = deque()
82-
ports_values: dict[str, ItemConcreteValue] = {}
87+
ports_values: dict[str, Optional[ItemConcreteValue]] = {}
8388
archiving_tasks: deque[Coroutine[None, None, None]] = deque()
8489

85-
for port in (await PORTS.outputs).values():
86-
logger.debug("Checking port %s", port.key)
87-
if port_keys and port.key not in port_keys:
88-
continue
89-
logger.debug(
90-
"uploading data to port '%s' with value '%s'...", port.key, port.value
91-
)
92-
if _FILE_TYPE_PREFIX in port.property_type:
93-
src_folder = outputs_path / port.key
94-
files_and_folders_list = list(src_folder.rglob("*"))
95-
logger.debug("Discovered files to upload %s", files_and_folders_list)
96-
97-
if not files_and_folders_list:
98-
ports_values[port.key] = None
99-
continue
100-
101-
if len(files_and_folders_list) == 1 and (
102-
files_and_folders_list[0].is_file()
103-
or files_and_folders_list[0].is_symlink()
104-
):
105-
# special case, direct upload
106-
ports_values[port.key] = files_and_folders_list[0]
90+
async with AsyncExitStack() as stack:
91+
for port in (await PORTS.outputs).values():
92+
logger.debug("Checking port %s", port.key)
93+
if port_keys and port.key not in port_keys:
10794
continue
108-
109-
# generic case let's create an archive
110-
# only the filtered out files will be zipped
111-
tmp_folder = Path(tempfile.mkdtemp())
112-
tmp_file = tmp_folder / f"{src_folder.stem}.zip"
113-
temp_paths.append(tmp_folder)
114-
115-
# when having multiple directories it is important to
116-
# run the compression in parallel to guarantee better performance
117-
archiving_tasks.append(
118-
archive_dir(
119-
dir_to_compress=src_folder,
120-
destination=tmp_file,
121-
compress=False,
122-
store_relative_path=True,
123-
)
95+
logger.debug(
96+
"uploading data to port '%s' with value '%s'...", port.key, port.value
12497
)
125-
ports_values[port.key] = tmp_file
126-
else:
127-
data_file = outputs_path / _KEY_VALUE_FILE_NAME
128-
if data_file.exists():
129-
data = json.loads(data_file.read_text())
130-
if port.key in data and data[port.key] is not None:
131-
ports_values[port.key] = data[port.key]
132-
else:
133-
logger.debug("Port %s not found in %s", port.key, data)
98+
if _FILE_TYPE_PREFIX in port.property_type:
99+
src_folder = outputs_path / port.key
100+
files_and_folders_list = list(src_folder.rglob("*"))
101+
logger.debug("Discovered files to upload %s", files_and_folders_list)
102+
103+
if not files_and_folders_list:
104+
ports_values[port.key] = None
105+
continue
106+
107+
if len(files_and_folders_list) == 1 and (
108+
files_and_folders_list[0].is_file()
109+
or files_and_folders_list[0].is_symlink()
110+
):
111+
# special case, direct upload
112+
ports_values[port.key] = files_and_folders_list[0]
113+
continue
114+
115+
# generic case let's create an archive
116+
# only the filtered out files will be zipped
117+
tmp_folder = await stack.enter_async_context(AioTemporaryDirectory())
118+
tmp_file = tmp_folder / f"{src_folder.stem}.zip"
119+
120+
# when having multiple directories it is important to
121+
# run the compression in parallel to guarantee better performance
122+
archiving_tasks.append(
123+
archive_dir(
124+
dir_to_compress=src_folder,
125+
destination=tmp_file,
126+
compress=False,
127+
store_relative_path=True,
128+
)
129+
)
130+
ports_values[port.key] = tmp_file
134131
else:
135-
logger.debug("No file %s to fetch port values from", data_file)
132+
data_file = outputs_path / _KEY_VALUE_FILE_NAME
133+
if data_file.exists():
134+
data = json.loads(data_file.read_text())
135+
if port.key in data and data[port.key] is not None:
136+
ports_values[port.key] = data[port.key]
137+
else:
138+
logger.debug("Port %s not found in %s", port.key, data)
139+
else:
140+
logger.debug("No file %s to fetch port values from", data_file)
136141

137-
try:
138142
if archiving_tasks:
139143
await logged_gather(*archiving_tasks)
140144
await PORTS.set_multiple(ports_values)
141145

142-
elapsed_time = time.perf_counter() - start_time
143-
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
144-
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
145-
finally:
146-
# clean up possible compressed files
147-
for file_path in temp_paths:
148-
await async_on_threadpool(
149-
# pylint: disable=cell-var-from-loop
150-
lambda: shutil.rmtree(file_path.parent, ignore_errors=True)
151-
)
146+
elapsed_time = time.perf_counter() - start_time
147+
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
148+
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
149+
logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
152150

153151

154152
async def dispatch_update_for_directory(

0 commit comments

Comments
 (0)