Skip to content

Commit 97fd911

Browse files
author
Andrei Neagu
committed
move archive to port instead of failing
1 parent 065bd8f commit 97fd911

File tree

1 file changed

+58
-30
lines changed
  • services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules

1 file changed

+58
-30
lines changed

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
from models_library.projects_nodes_io import NodeIDStr
2121
from models_library.services_types import ServicePortKey
2222
from pydantic import ByteSize
23-
from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir
23+
from servicelib.archiving_utils import (
24+
ArchiveError,
25+
PrunableFolder,
26+
archive_dir,
27+
unarchive_dir,
28+
)
2429
from servicelib.async_utils import run_sequentially_in_context
2530
from servicelib.file_utils import remove_directory
2631
from servicelib.logging_utils import log_context
@@ -46,7 +51,7 @@ class PortTypeName(str, Enum):
4651
_FILE_TYPE_PREFIX = "data:"
4752
_KEY_VALUE_FILE_NAME = "key_values.json"
4853

49-
logger = logging.getLogger(__name__)
54+
_logger = logging.getLogger(__name__)
5055

5156
# OUTPUTS section
5257

@@ -95,7 +100,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
95100
port_notifier: PortNotifier,
96101
) -> None:
97102
# pylint: disable=too-many-branches
98-
logger.debug("uploading data to simcore...")
103+
_logger.debug("uploading data to simcore...")
99104
start_time = time.perf_counter()
100105

101106
settings: ApplicationSettings = get_settings()
@@ -138,7 +143,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
138143
if is_file_type(port.property_type):
139144
src_folder = outputs_path / port.key
140145
files_and_folders_list = list(src_folder.rglob("*"))
141-
logger.debug("Discovered files to upload %s", files_and_folders_list)
146+
_logger.debug("Discovered files to upload %s", files_and_folders_list)
142147

143148
if not files_and_folders_list:
144149
ports_values[port.key] = (None, None)
@@ -213,9 +218,9 @@ async def _archive_dir_notified(
213218
if port.key in data and data[port.key] is not None:
214219
ports_values[port.key] = (data[port.key], None)
215220
else:
216-
logger.debug("Port %s not found in %s", port.key, data)
221+
_logger.debug("Port %s not found in %s", port.key, data)
217222
else:
218-
logger.debug("No file %s to fetch port values from", data_file)
223+
_logger.debug("No file %s to fetch port values from", data_file)
219224

220225
if archiving_tasks:
221226
await limited_gather(*archiving_tasks, limit=4)
@@ -228,8 +233,8 @@ async def _archive_dir_notified(
228233

229234
elapsed_time = time.perf_counter() - start_time
230235
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
231-
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
232-
logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
236+
_logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
237+
_logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
233238

234239

235240
# INPUTS section
@@ -243,14 +248,32 @@ def _is_zip_file(file_path: Path) -> bool:
243248
_shutil_move = aiofiles.os.wrap(shutil.move)
244249

245250

251+
async def _move_file_to_input_port(
252+
final_path: Path, downloaded_file: Path, dest_folder: PrunableFolder
253+
) -> None:
254+
_logger.debug("moving %s", downloaded_file)
255+
final_path = final_path / downloaded_file.name
256+
257+
# ensure parent exists
258+
final_path.parent.mkdir(exist_ok=True, parents=True)
259+
260+
await _shutil_move(downloaded_file, final_path)
261+
262+
# NOTE: after the download the current value of the port
263+
# makes sure previously downloaded files are removed
264+
dest_folder.prune(exclude={final_path})
265+
266+
_logger.debug("file moved to %s", final_path)
267+
268+
246269
async def _get_data_from_port(
247270
port: Port, *, target_dir: Path, progress_bar: ProgressBarData
248271
) -> tuple[Port, ItemConcreteValue | None, ByteSize]:
249272
async with progress_bar.sub_progress(
250273
steps=2 if is_file_type(port.property_type) else 1,
251274
description=IDStr("getting data"),
252275
) as sub_progress:
253-
with log_context(logger, logging.DEBUG, f"getting {port.key=}"):
276+
with log_context(_logger, logging.DEBUG, f"getting {port.key=}"):
254277
port_data = await port.get(sub_progress)
255278

256279
if is_file_type(port.property_type):
@@ -261,7 +284,7 @@ async def _get_data_from_port(
261284
if not downloaded_file or not downloaded_file.exists():
262285
# the link may be empty
263286
# remove files all files from disk when disconnecting port
264-
logger.debug("removing contents of dir %s", final_path)
287+
_logger.debug("removing contents of dir %s", final_path)
265288
await remove_directory(
266289
final_path, only_children=True, ignore_errors=True
267290
)
@@ -270,33 +293,38 @@ async def _get_data_from_port(
270293
transferred_bytes = downloaded_file.stat().st_size
271294

272295
# in case of valid file, it is either uncompressed and/or moved to the final directory
273-
with log_context(logger, logging.DEBUG, "creating directory"):
296+
with log_context(_logger, logging.DEBUG, "creating directory"):
274297
final_path.mkdir(exist_ok=True, parents=True)
275298
port_data = f"{final_path}"
276299
dest_folder = PrunableFolder(final_path)
277300

278301
if _is_zip_file(downloaded_file):
279302
# unzip updated data to dest_path
280-
logger.debug("unzipping %s", downloaded_file)
281-
unarchived: set[Path] = await unarchive_dir(
282-
archive_to_extract=downloaded_file,
283-
destination_folder=final_path,
284-
progress_bar=sub_progress,
285-
)
286-
287-
dest_folder.prune(exclude=unarchived)
303+
_logger.debug("unzipping %s", downloaded_file)
304+
try:
305+
unarchived: set[Path] = await unarchive_dir(
306+
archive_to_extract=downloaded_file,
307+
destination_folder=final_path,
308+
progress_bar=sub_progress,
309+
)
310+
dest_folder.prune(exclude=unarchived)
311+
312+
_logger.debug("all unzipped in %s", final_path)
313+
except ArchiveError:
314+
_logger.warning(
315+
"Could not extract archive '%s' to '%s' moving it to: '%s'",
316+
downloaded_file,
317+
final_path,
318+
final_path / downloaded_file.name,
319+
)
320+
await _move_file_to_input_port(
321+
final_path, downloaded_file, dest_folder
322+
)
288323

289-
logger.debug("all unzipped in %s", final_path)
290324
else:
291-
logger.debug("moving %s", downloaded_file)
292-
final_path = final_path / Path(downloaded_file).name
293-
await _shutil_move(str(downloaded_file), final_path)
294-
295-
# NOTE: after the download the current value of the port
296-
# makes sure previously downloaded files are removed
297-
dest_folder.prune(exclude={final_path})
325+
await _move_file_to_input_port(final_path, downloaded_file, dest_folder)
298326

299-
logger.debug("all moved to %s", final_path)
327+
_logger.debug("all moved to %s", final_path)
300328
else:
301329
transferred_bytes = sys.getsizeof(port_data)
302330

@@ -312,7 +340,7 @@ async def download_target_ports(
312340
progress_bar: ProgressBarData,
313341
port_notifier: PortNotifier | None,
314342
) -> ByteSize:
315-
logger.debug("retrieving data from simcore...")
343+
_logger.debug("retrieving data from simcore...")
316344
start_time = time.perf_counter()
317345

318346
settings: ApplicationSettings = get_settings()
@@ -386,7 +414,7 @@ async def _get_date_from_port_notified(
386414
data_file.write_text(json.dumps(data))
387415

388416
elapsed_time = time.perf_counter() - start_time
389-
logger.info(
417+
_logger.info(
390418
"Downloaded %s in %s seconds",
391419
total_transfered_bytes.human_readable(decimal=True),
392420
elapsed_time,

0 commit comments

Comments
 (0)