Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
97fd911
move archive to port instead of failing
Dec 12, 2024
29533e9
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 12, 2024
ca87d0b
using log context and rename
Dec 13, 2024
7f3e41e
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 13, 2024
be58b0c
refactor to use less generic error UnsupportedArchiveFormat
Dec 13, 2024
53e0728
renaming error
Dec 13, 2024
65b5291
replace unarchiver
Dec 16, 2024
e6d118d
remove unused
Dec 16, 2024
2a2f917
remove comment
Dec 16, 2024
56d7fe1
refactor 7zip
Dec 16, 2024
9c82bc5
fixed broken install
Dec 16, 2024
5a4cfb5
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 16, 2024
0f315e0
refactor
Dec 16, 2024
7ede41a
mypy
Dec 16, 2024
9d67dd1
added extra test
Dec 16, 2024
0f1642d
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 17, 2024
71adc5b
move to folder
Dec 17, 2024
01ce053
small enhancements in tests
Dec 17, 2024
79c1648
restructure package
Dec 17, 2024
02c9497
modules renaming
Dec 17, 2024
c01e35a
refactor
Dec 17, 2024
13c1485
fix refactor
Dec 17, 2024
653e168
added decompress progress
Dec 17, 2024
6dfcd95
fixed issues with parsing progress
Dec 17, 2024
c3b0c1c
restructure tests with common utils
Dec 19, 2024
56d1871
added notes to deprecate
Dec 19, 2024
30346af
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 19, 2024
a9145e2
remove unwanted code
Dec 19, 2024
46a0c29
remove unused
Dec 19, 2024
c04999c
removed unused feature
Dec 19, 2024
d48de81
refactor to drop relative_paths since it's always True
Dec 19, 2024
9ff2374
added tqdm progress
Dec 19, 2024
0364e8b
refactor common code
Dec 19, 2024
7205c2f
connected progressbar
Dec 19, 2024
79cc7e8
added dropin interface
Dec 19, 2024
e69bc9a
remove old unused interface
Dec 19, 2024
e282c95
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 19, 2024
7279155
removes unused repro-zipfile
Dec 19, 2024
571a411
refactor imports
Dec 19, 2024
da30d13
restructure model
Dec 19, 2024
9ca1090
simplify outputs parsing
Dec 19, 2024
97bdff1
install path is now fixed to avoid using old system versions
Dec 20, 2024
280b0d3
fixed 7z installation
Dec 20, 2024
6759dc7
enhanced 7zip interface
Dec 20, 2024
4a5a3a6
added arm64 target install
Dec 20, 2024
21c7f0a
fixed lines scanning parsing
Dec 20, 2024
a58849f
added edge case test
Dec 20, 2024
f5ae500
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Dec 20, 2024
abc45ef
refactor
Dec 20, 2024
82413f1
refactor
Dec 20, 2024
cbf7533
rename
Dec 20, 2024
5739963
mypy
Dec 20, 2024
9495ac1
making flatter
Dec 20, 2024
d693eeb
revert change
Dec 20, 2024
7e5b967
adding missing
Dec 20, 2024
05ab2f1
remove unused
Dec 20, 2024
7041eee
remove useless
Dec 20, 2024
622bb66
rename
Dec 20, 2024
2ddc4df
refactor errors and re
Dec 23, 2024
d2de8be
replaced regex search with complied expressions
Dec 23, 2024
010fe01
feedback
Dec 23, 2024
09cc78b
human readable refacto
Jan 6, 2025
cfb5f6d
revert
Jan 6, 2025
e825e07
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Jan 6, 2025
9d028bd
extract percentage via regex
Jan 7, 2025
f791538
only one log handler is supported
Jan 7, 2025
6d8c10a
rename
Jan 7, 2025
3881d18
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Jan 7, 2025
3a9de60
remove meaningless comment
Jan 7, 2025
805bd9e
fixed parsing issue
Jan 7, 2025
fd58947
fixed issue with file listing
Jan 7, 2025
229b9aa
removing archive after download
Jan 7, 2025
8ec0c9f
Merge remote-tracking branch 'upstream/master' into pr-osparc-deflate…
Jan 7, 2025
b14d359
revert upgraded
Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class ArchiveError(Exception):
"""


class UnsupportedArchiveFormat(Exception):
pass


def _human_readable_size(size, decimal_places=3):
human_readable_file_size = float(size)
unit = "B"
Expand Down Expand Up @@ -260,6 +264,10 @@ async def unarchive_dir(
f"Failed unarchiving {archive_to_extract} -> {destination_folder} due to {type(err)}."
f"Details: {err}"
)
# maybe write unsupported error format here instead of all this to be able to still raise in case of error
if isinstance(err, NotImplementedError):
raise UnsupportedArchiveFormat(msg) from err

raise ArchiveError(msg) from err

# NOTE: extracted_paths includes all tree leafs, which might include files and empty folders
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
from models_library.projects_nodes_io import NodeIDStr
from models_library.services_types import ServicePortKey
from pydantic import ByteSize
from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir
from servicelib.archiving_utils import (
PrunableFolder,
UnsupportedArchiveFormat,
archive_dir,
unarchive_dir,
)
from servicelib.async_utils import run_sequentially_in_context
from servicelib.file_utils import remove_directory
from servicelib.logging_utils import log_context
Expand All @@ -46,7 +51,7 @@ class PortTypeName(str, Enum):
_FILE_TYPE_PREFIX = "data:"
_KEY_VALUE_FILE_NAME = "key_values.json"

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

# OUTPUTS section

Expand Down Expand Up @@ -95,7 +100,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
port_notifier: PortNotifier,
) -> None:
# pylint: disable=too-many-branches
logger.debug("uploading data to simcore...")
_logger.debug("uploading data to simcore...")
start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -138,7 +143,7 @@ async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915
if is_file_type(port.property_type):
src_folder = outputs_path / port.key
files_and_folders_list = list(src_folder.rglob("*"))
logger.debug("Discovered files to upload %s", files_and_folders_list)
_logger.debug("Discovered files to upload %s", files_and_folders_list)

if not files_and_folders_list:
ports_values[port.key] = (None, None)
Expand Down Expand Up @@ -213,9 +218,9 @@ async def _archive_dir_notified(
if port.key in data and data[port.key] is not None:
ports_values[port.key] = (data[port.key], None)
else:
logger.debug("Port %s not found in %s", port.key, data)
_logger.debug("Port %s not found in %s", port.key, data)
else:
logger.debug("No file %s to fetch port values from", data_file)
_logger.debug("No file %s to fetch port values from", data_file)

if archiving_tasks:
await limited_gather(*archiving_tasks, limit=4)
Expand All @@ -228,8 +233,8 @@ async def _archive_dir_notified(

elapsed_time = time.perf_counter() - start_time
total_bytes = sum(_get_size_of_value(x) for x in ports_values.values())
logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)
_logger.info("Uploaded %s bytes in %s seconds", total_bytes, elapsed_time)
_logger.debug(_CONTROL_TESTMARK_DY_SIDECAR_NODEPORT_UPLOADED_MESSAGE)


# INPUTS section
Expand All @@ -243,14 +248,29 @@ def _is_zip_file(file_path: Path) -> bool:
_shutil_move = aiofiles.os.wrap(shutil.move)


async def _move_file_to_input_port(
final_path: Path, downloaded_file: Path, dest_folder: PrunableFolder
) -> None:
with log_context(_logger, logging.DEBUG, f"moving {downloaded_file}"):
final_path = final_path / downloaded_file.name
# ensure parent exists
final_path.parent.mkdir(exist_ok=True, parents=True)

await _shutil_move(downloaded_file, final_path)

# NOTE: after the port content changes, make sure old files
# which are no longer part of the port, are removed
dest_folder.prune(exclude={final_path})


async def _get_data_from_port(
port: Port, *, target_dir: Path, progress_bar: ProgressBarData
) -> tuple[Port, ItemConcreteValue | None, ByteSize]:
async with progress_bar.sub_progress(
steps=2 if is_file_type(port.property_type) else 1,
description=IDStr("getting data"),
) as sub_progress:
with log_context(logger, logging.DEBUG, f"getting {port.key=}"):
with log_context(_logger, logging.DEBUG, f"getting {port.key=}"):
port_data = await port.get(sub_progress)

if is_file_type(port.property_type):
Expand All @@ -261,7 +281,7 @@ async def _get_data_from_port(
if not downloaded_file or not downloaded_file.exists():
# the link may be empty
# remove files all files from disk when disconnecting port
logger.debug("removing contents of dir %s", final_path)
_logger.debug("removing contents of dir %s", final_path)
await remove_directory(
final_path, only_children=True, ignore_errors=True
)
Expand All @@ -270,33 +290,38 @@ async def _get_data_from_port(
transferred_bytes = downloaded_file.stat().st_size

# in case of valid file, it is either uncompressed and/or moved to the final directory
with log_context(logger, logging.DEBUG, "creating directory"):
with log_context(_logger, logging.DEBUG, "creating directory"):
final_path.mkdir(exist_ok=True, parents=True)
port_data = f"{final_path}"
dest_folder = PrunableFolder(final_path)

if _is_zip_file(downloaded_file):
# unzip updated data to dest_path
logger.debug("unzipping %s", downloaded_file)
unarchived: set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file,
destination_folder=final_path,
progress_bar=sub_progress,
)

dest_folder.prune(exclude=unarchived)
_logger.debug("unzipping %s", downloaded_file)
try:
unarchived: set[Path] = await unarchive_dir(
archive_to_extract=downloaded_file,
destination_folder=final_path,
progress_bar=sub_progress,
)
dest_folder.prune(exclude=unarchived)

_logger.debug("all unzipped in %s", final_path)
except UnsupportedArchiveFormat:
_logger.warning(
"Could not extract archive '%s' to '%s' moving it to: '%s'",
downloaded_file,
final_path,
final_path / downloaded_file.name,
)
await _move_file_to_input_port(
final_path, downloaded_file, dest_folder
)

logger.debug("all unzipped in %s", final_path)
else:
logger.debug("moving %s", downloaded_file)
final_path = final_path / Path(downloaded_file).name
await _shutil_move(str(downloaded_file), final_path)

# NOTE: after the download the current value of the port
# makes sure previously downloaded files are removed
dest_folder.prune(exclude={final_path})
await _move_file_to_input_port(final_path, downloaded_file, dest_folder)

logger.debug("all moved to %s", final_path)
_logger.debug("all moved to %s", final_path)
else:
transferred_bytes = sys.getsizeof(port_data)

Expand All @@ -312,7 +337,7 @@ async def download_target_ports(
progress_bar: ProgressBarData,
port_notifier: PortNotifier | None,
) -> ByteSize:
logger.debug("retrieving data from simcore...")
_logger.debug("retrieving data from simcore...")
start_time = time.perf_counter()

settings: ApplicationSettings = get_settings()
Expand Down Expand Up @@ -386,7 +411,7 @@ async def _get_date_from_port_notified(
data_file.write_text(json.dumps(data))

elapsed_time = time.perf_counter() - start_time
logger.info(
_logger.info(
"Downloaded %s in %s seconds",
total_transfered_bytes.human_readable(decimal=True),
elapsed_time,
Expand Down
Loading