Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ def ensure_backwards_compatible_setting_type(cls, v):
SimcoreServiceSettingsLabel = ListModel[SimcoreServiceSettingLabelEntry]


class LegacyState(BaseModel):
old_state_path: Path
new_state_path: Path


class PathMappingsLabel(BaseModel):
"""Content of "simcore.service.paths-mapping" label"""

Expand Down Expand Up @@ -195,6 +200,29 @@ class PathMappingsLabel(BaseModel):
),
)

legacy_state: LegacyState | None = Field(
None,
description=(
"if present, the service needs to first try to download the legacy state"
"coming from a different path."
),
)

@field_validator("legacy_state")
@classmethod
def validate_legacy_state(
cls, v: LegacyState | None, info: ValidationInfo
) -> LegacyState | None:
if v is None:
return v

state_paths: list[Path] = info.data.get("state_paths", [])
if v.new_state_path not in state_paths:
msg = f"legacy_state={v} not found in state_paths={state_paths}"
raise ValueError(msg)

return v

@field_validator("volume_size_limits")
@classmethod
def validate_volume_limits(cls, v, info: ValidationInfo) -> str | None:
Expand Down Expand Up @@ -252,6 +280,16 @@ def validate_volume_limits(cls, v, info: ValidationInfo) -> str | None:
"/t_inp": "1EIB",
},
},
{
"outputs_path": "/tmp/outputs", # noqa: S108 nosec
"inputs_path": "/tmp/inputs", # noqa: S108 nosec
"state_paths": ["/tmp/save_1", "/tmp_save_2"], # noqa: S108 nosec
"state_exclude": ["/tmp/strip_me/*"], # noqa: S108 nosec
"legacy_state": {
"old_state_path": "/tmp/save_1_legacy", # noqa: S108 nosec
"new_state_path": "/tmp/save_1", # noqa: S108 nosec
},
},
]
},
)
Expand Down
72 changes: 62 additions & 10 deletions packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, StorageFileID
from models_library.service_settings_labels import LegacyState
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.archiving_utils import unarchive_dir
Expand Down Expand Up @@ -101,14 +102,16 @@ async def _pull_legacy_archive(
*,
io_log_redirect_cb: LogRedirectCB,
progress_bar: ProgressBarData,
legacy_destination_path: Path | None = None,
) -> None:
# NOTE: the legacy way of storing states was as zip archives
archive_path = legacy_destination_path or destination_path
async with progress_bar.sub_progress(
steps=2, description=f"pulling {destination_path.name}"
steps=2, description=f"pulling {archive_path.name}"
) as sub_prog:
with TemporaryDirectory() as tmp_dir_name:
archive_file = Path(tmp_dir_name) / __get_s3_name(
destination_path, is_archive=True
archive_path, is_archive=True
)

s3_object = __create_s3_object_key(project_id, node_uuid, archive_file)
Expand All @@ -124,7 +127,7 @@ async def _pull_legacy_archive(
progress_bar=sub_prog,
aws_s3_cli_settings=None,
)
_logger.info("completed pull of %s.", destination_path)
_logger.info("completed pull of %s.", archive_path)

if io_log_redirect_cb:
await io_log_redirect_cb(
Expand Down Expand Up @@ -194,6 +197,7 @@ async def push(
exclude_patterns: set[str] | None = None,
progress_bar: ProgressBarData,
aws_s3_cli_settings: AwsS3CliSettings | None,
legacy_state: LegacyState | None,
) -> None:
"""pushes and removes the legacy archive if present"""

Expand All @@ -208,23 +212,39 @@ async def push(
progress_bar=progress_bar,
aws_s3_cli_settings=aws_s3_cli_settings,
)

archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
is_archive=True,
)
if archive_exists:
with log_context(_logger, logging.INFO, "removing legacy archive"):
await _delete_legacy_archive(
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
)

if not archive_exists:
return

with log_context(_logger, logging.INFO, "removing legacy data archive"):
await _delete_legacy_archive(
if legacy_state:
legacy_archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
path=legacy_state.old_state_path,
is_archive=True,
)
if legacy_archive_exists:
with log_context(
_logger, logging.INFO, f"removing legacy archive in {legacy_state}"
):
await _delete_legacy_archive(
project_id=project_id,
node_uuid=node_uuid,
path=legacy_state.old_state_path,
)


async def pull(
Expand All @@ -237,9 +257,41 @@ async def pull(
r_clone_settings: RCloneSettings,
progress_bar: ProgressBarData,
aws_s3_cli_settings: AwsS3CliSettings | None,
legacy_state: LegacyState | None,
) -> None:
"""restores the state folder"""

if legacy_state and legacy_state.new_state_path == destination_path:
_logger.info(
"trying to restore from legacy_state=%s, destination_path=%s",
legacy_state,
destination_path,
)
legacy_state_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=legacy_state.old_state_path,
is_archive=True,
)
_logger.info("legacy_state_exists=%s", legacy_state_exists)
if legacy_state_exists:
with log_context(
_logger,
logging.INFO,
f"restoring data from legacy archive in {legacy_state}",
):
await _pull_legacy_archive(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
destination_path=legacy_state.new_state_path,
io_log_redirect_cb=io_log_redirect_cb,
progress_bar=progress_bar,
legacy_destination_path=legacy_state.old_state_path,
)
return

state_archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
Expand All @@ -248,7 +300,7 @@ async def pull(
is_archive=True,
)
if state_archive_exists:
with log_context(_logger, logging.INFO, "restoring legacy data archive"):
with log_context(_logger, logging.INFO, "restoring data from legacy archive"):
await _pull_legacy_archive(
user_id=user_id,
project_id=project_id,
Expand Down
15 changes: 13 additions & 2 deletions packages/simcore-sdk/tests/unit/test_node_data_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async def test_push_file(
mock_filemanager.reset_mock()


@pytest.mark.parametrize("create_legacy_archive", [False, True])
async def test_pull_legacy_archive(
user_id: int,
project_id: ProjectID,
Expand All @@ -181,6 +182,7 @@ async def test_pull_legacy_archive(
create_files: Callable[..., list[Path]],
mock_io_log_redirect_cb: LogRedirectCB,
faker: Faker,
create_legacy_archive: bool,
):
assert tmpdir.exists()
# create a folder to compress from
Expand All @@ -200,7 +202,13 @@ async def test_pull_legacy_archive(
create_files(files_number, test_control_folder)
compressed_file_name = test_compression_folder / test_folder.stem
archive_file = make_archive(
f"{compressed_file_name}", "zip", root_dir=test_control_folder
(
f"{compressed_file_name}_legacy"
if create_legacy_archive
else f"{compressed_file_name}"
),
"zip",
root_dir=test_control_folder,
)
assert Path(archive_file).exists()
# create mock downloaded folder
Expand Down Expand Up @@ -229,13 +237,16 @@ async def test_pull_legacy_archive(
test_folder,
io_log_redirect_cb=mock_io_log_redirect_cb,
progress_bar=progress_bar,
legacy_destination_path=(
Path(f"{test_folder}_legacy") if create_legacy_archive else None
),
)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
mock_temporary_directory.assert_called_once()
mock_filemanager.download_path_from_s3.assert_called_once_with(
user_id=user_id,
local_path=test_compression_folder,
s3_object=f"{project_id}/{node_uuid}/{test_folder.stem}.zip",
s3_object=f"{project_id}/{node_uuid}/{f'{test_folder.stem}_legacy' if create_legacy_archive else test_folder.stem}.zip",
store_id=SIMCORE_LOCATION,
store_name=None,
io_log_redirect_cb=mock_io_log_redirect_cb,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _get_environment_variables(
"DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS": f"{allow_internet_access}",
"DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": f"{telemetry_enabled}",
"DY_SIDECAR_STATE_EXCLUDE": json_dumps(f"{x}" for x in state_exclude),
"DY_SIDECAR_LEGACY_STATE": (
"null"
if scheduler_data.paths_mapping.legacy_state is None
else scheduler_data.paths_mapping.legacy_state.model_dump_json()
),
"DY_SIDECAR_CALLBACKS_MAPPING": callbacks_mapping.model_dump_json(),
"DY_SIDECAR_STATE_PATHS": json_dumps(
f"{x}" for x in scheduler_data.paths_mapping.state_paths
Expand Down Expand Up @@ -451,18 +456,18 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa:
scheduler_data.product_name is not None
), "ONLY for legacy. This function should not be called with product_name==None" # nosec

standard_simcore_docker_labels: dict[
DockerLabelKey, str
] = StandardSimcoreDockerLabels(
user_id=scheduler_data.user_id,
project_id=scheduler_data.project_id,
node_id=scheduler_data.node_uuid,
product_name=scheduler_data.product_name,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
memory_limit=ByteSize(0), # this should get overwritten
cpu_limit=0, # this should get overwritten
).to_simcore_runtime_docker_labels()
standard_simcore_docker_labels: dict[DockerLabelKey, str] = (
StandardSimcoreDockerLabels(
user_id=scheduler_data.user_id,
project_id=scheduler_data.project_id,
node_id=scheduler_data.node_uuid,
product_name=scheduler_data.product_name,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
memory_limit=ByteSize(0), # this should get overwritten
cpu_limit=0, # this should get overwritten
).to_simcore_runtime_docker_labels()
)

service_labels: dict[str, str] = (
{
Expand Down Expand Up @@ -494,9 +499,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa:
)
)

placement_substitutions: dict[
str, DockerPlacementConstraint
] = (
placement_substitutions: dict[str, DockerPlacementConstraint] = (
placement_settings.DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS
)
for image_resources in scheduler_data.service_resources.values():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"DY_DOCKER_HUB_REGISTRY_SETTINGS",
"DY_SIDECAR_AWS_S3_CLI_SETTINGS",
"DY_SIDECAR_CALLBACKS_MAPPING",
"DY_SIDECAR_LEGACY_STATE",
"DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED",
"DY_SIDECAR_NODE_ID",
"DY_SIDECAR_PATH_INPUTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def expected_dynamic_sidecar_spec(
"DY_SIDECAR_SERVICE_VERSION": "2.4.5",
"DY_SIDECAR_PRODUCT_NAME": osparc_product_name,
"DY_SIDECAR_USER_PREFERENCES_PATH": "None",
"DY_SIDECAR_LEGACY_STATE": "null",
"DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED": "True",
"POSTGRES_DB": "test",
"POSTGRES_HOST": "localhost",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.service_settings_labels import LegacyState
from models_library.services import DynamicServiceKey, ServiceRunID, ServiceVersion
from models_library.users import UserID
from pydantic import (
Expand Down Expand Up @@ -141,6 +142,10 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
DY_SIDECAR_STATE_EXCLUDE: set[str] = Field(
..., description="list of patterns to exclude files when saving states"
)
DY_SIDECAR_LEGACY_STATE: LegacyState | None = Field(
default=None, description="used to recover state when upgrading service"
)

DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
default=False,
validation_alias=AliasChoices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from models_library.generated_models.docker_rest_api import ContainerState
from models_library.rabbitmq_messages import ProgressType, SimcorePlatformStatus
from models_library.service_settings_labels import LegacyState
from pydantic import PositiveInt
from servicelib.file_utils import log_directory_changes
from servicelib.logging_utils import log_context
Expand Down Expand Up @@ -337,6 +338,19 @@ def _get_satate_folders_size(paths: list[Path]) -> int:
return total_size


def _get_legacy_state_with_dy_volumes_path(
settings: ApplicationSettings,
) -> LegacyState | None:
legacy_state = settings.DY_SIDECAR_LEGACY_STATE
if legacy_state is None:
return None
dy_volumes = settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR
return LegacyState(
old_state_path=dy_volumes / legacy_state.old_state_path.relative_to("/"),
new_state_path=dy_volumes / legacy_state.new_state_path.relative_to("/"),
)


async def _restore_state_folder(
app: FastAPI,
*,
Expand All @@ -348,13 +362,14 @@ async def _restore_state_folder(
user_id=settings.DY_SIDECAR_USER_ID,
project_id=settings.DY_SIDECAR_PROJECT_ID,
node_uuid=settings.DY_SIDECAR_NODE_ID,
destination_path=state_path,
destination_path=Path(state_path),
io_log_redirect_cb=functools.partial(
post_sidecar_log_message, app, log_level=logging.INFO
),
r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS,
progress_bar=progress_bar,
aws_s3_cli_settings=settings.DY_SIDECAR_AWS_S3_CLI_SETTINGS,
legacy_state=_get_legacy_state_with_dy_volumes_path(settings),
)


Expand Down Expand Up @@ -429,6 +444,7 @@ async def _save_state_folder(
),
progress_bar=progress_bar,
aws_s3_cli_settings=settings.DY_SIDECAR_AWS_S3_CLI_SETTINGS,
legacy_state=_get_legacy_state_with_dy_volumes_path(settings),
)


Expand Down
Loading