Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ def ensure_backwards_compatible_setting_type(cls, v):
SimcoreServiceSettingsLabel = ListModel[SimcoreServiceSettingLabelEntry]


class LegacyState(BaseModel):
old_state_path: Path
new_state_path: Path


class PathMappingsLabel(BaseModel):
"""Content of "simcore.service.paths-mapping" label"""

Expand Down Expand Up @@ -195,6 +200,29 @@ class PathMappingsLabel(BaseModel):
),
)

legacy_state: LegacyState | None = Field(
None,
description=(
"if present, the service needs to first try to download the legacy state"
"coming from a different path."
),
)

@field_validator("legacy_state")
@classmethod
def validate_legacy_state(
cls, v: LegacyState | None, info: ValidationInfo
) -> LegacyState | None:
if v is None:
return v

state_paths: list[Path] = info.data.get("state_paths", [])
if v.new_state_path not in state_paths:
msg = f"legacy_state={v} not found in state_paths={state_paths}"
raise ValueError(msg)

return v

@field_validator("volume_size_limits")
@classmethod
def validate_volume_limits(cls, v, info: ValidationInfo) -> str | None:
Expand Down Expand Up @@ -252,6 +280,16 @@ def validate_volume_limits(cls, v, info: ValidationInfo) -> str | None:
"/t_inp": "1EIB",
},
},
{
"outputs_path": "/tmp/outputs", # noqa: S108 nosec
"inputs_path": "/tmp/inputs", # noqa: S108 nosec
"state_paths": ["/tmp/save_1", "/tmp_save_2"], # noqa: S108 nosec
"state_exclude": ["/tmp/strip_me/*"], # noqa: S108 nosec
"legacy_state": {
"old_state_path": "/tmp/save_1_legacy", # noqa: S108 nosec
"new_state_path": "/tmp/save_1", # noqa: S108 nosec
},
},
]
},
)
Expand Down
72 changes: 62 additions & 10 deletions packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, StorageFileID
from models_library.service_settings_labels import LegacyState
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.archiving_utils import unarchive_dir
Expand Down Expand Up @@ -101,14 +102,16 @@ async def _pull_legacy_archive(
*,
io_log_redirect_cb: LogRedirectCB,
progress_bar: ProgressBarData,
legacy_destination_path: Path | None = None,
) -> None:
# NOTE: the legacy way of storing states was as zip archives
archive_path = legacy_destination_path or destination_path
async with progress_bar.sub_progress(
steps=2, description=f"pulling {destination_path.name}"
steps=2, description=f"pulling {archive_path.name}"
) as sub_prog:
with TemporaryDirectory() as tmp_dir_name:
archive_file = Path(tmp_dir_name) / __get_s3_name(
destination_path, is_archive=True
archive_path, is_archive=True
)

s3_object = __create_s3_object_key(project_id, node_uuid, archive_file)
Expand All @@ -124,7 +127,7 @@ async def _pull_legacy_archive(
progress_bar=sub_prog,
aws_s3_cli_settings=None,
)
_logger.info("completed pull of %s.", destination_path)
_logger.info("completed pull of %s.", archive_path)

if io_log_redirect_cb:
await io_log_redirect_cb(
Expand Down Expand Up @@ -194,6 +197,7 @@ async def push(
exclude_patterns: set[str] | None = None,
progress_bar: ProgressBarData,
aws_s3_cli_settings: AwsS3CliSettings | None,
legacy_state: LegacyState | None,
) -> None:
"""pushes and removes the legacy archive if present"""

Expand All @@ -208,23 +212,39 @@ async def push(
progress_bar=progress_bar,
aws_s3_cli_settings=aws_s3_cli_settings,
)

archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
is_archive=True,
)
if archive_exists:
with log_context(_logger, logging.INFO, "removing legacy archive"):
await _delete_legacy_archive(
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
)

if not archive_exists:
return

with log_context(_logger, logging.INFO, "removing legacy data archive"):
await _delete_legacy_archive(
if legacy_state:
legacy_archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=source_path,
path=legacy_state.old_state_path,
is_archive=True,
)
if legacy_archive_exists:
with log_context(
_logger, logging.INFO, f"removing legacy archive in {legacy_state}"
):
await _delete_legacy_archive(
project_id=project_id,
node_uuid=node_uuid,
path=legacy_state.old_state_path,
)


async def pull(
Expand All @@ -237,9 +257,41 @@ async def pull(
r_clone_settings: RCloneSettings,
progress_bar: ProgressBarData,
aws_s3_cli_settings: AwsS3CliSettings | None,
legacy_state: LegacyState | None,
) -> None:
"""restores the state folder"""

if legacy_state and legacy_state.new_state_path == destination_path:
_logger.info(
"trying to restore from legacy_state=%s, destination_path=%s",
legacy_state,
destination_path,
)
legacy_state_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
path=legacy_state.old_state_path,
is_archive=True,
)
_logger.info("legacy_state_exists=%s", legacy_state_exists)
if legacy_state_exists:
with log_context(
_logger,
logging.INFO,
f"restoring data from legacy archive in {legacy_state}",
):
await _pull_legacy_archive(
user_id=user_id,
project_id=project_id,
node_uuid=node_uuid,
destination_path=legacy_state.new_state_path,
io_log_redirect_cb=io_log_redirect_cb,
progress_bar=progress_bar,
legacy_destination_path=legacy_state.old_state_path,
)
return

state_archive_exists = await _state_metadata_entry_exists(
user_id=user_id,
project_id=project_id,
Expand All @@ -248,7 +300,7 @@ async def pull(
is_archive=True,
)
if state_archive_exists:
with log_context(_logger, logging.INFO, "restoring legacy data archive"):
with log_context(_logger, logging.INFO, "restoring data from legacy archive"):
await _pull_legacy_archive(
user_id=user_id,
project_id=project_id,
Expand Down
15 changes: 13 additions & 2 deletions packages/simcore-sdk/tests/unit/test_node_data_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async def test_push_file(
mock_filemanager.reset_mock()


@pytest.mark.parametrize("create_legacy_archive", [False, True])
async def test_pull_legacy_archive(
user_id: int,
project_id: ProjectID,
Expand All @@ -181,6 +182,7 @@ async def test_pull_legacy_archive(
create_files: Callable[..., list[Path]],
mock_io_log_redirect_cb: LogRedirectCB,
faker: Faker,
create_legacy_archive: bool,
):
assert tmpdir.exists()
# create a folder to compress from
Expand All @@ -200,7 +202,13 @@ async def test_pull_legacy_archive(
create_files(files_number, test_control_folder)
compressed_file_name = test_compression_folder / test_folder.stem
archive_file = make_archive(
f"{compressed_file_name}", "zip", root_dir=test_control_folder
(
f"{compressed_file_name}_legacy"
if create_legacy_archive
else f"{compressed_file_name}"
),
"zip",
root_dir=test_control_folder,
)
assert Path(archive_file).exists()
# create mock downloaded folder
Expand Down Expand Up @@ -229,13 +237,16 @@ async def test_pull_legacy_archive(
test_folder,
io_log_redirect_cb=mock_io_log_redirect_cb,
progress_bar=progress_bar,
legacy_destination_path=(
Path(f"{test_folder}_legacy") if create_legacy_archive else None
),
)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
mock_temporary_directory.assert_called_once()
mock_filemanager.download_path_from_s3.assert_called_once_with(
user_id=user_id,
local_path=test_compression_folder,
s3_object=f"{project_id}/{node_uuid}/{test_folder.stem}.zip",
s3_object=f"{project_id}/{node_uuid}/{f'{test_folder.stem}_legacy' if create_legacy_archive else test_folder.stem}.zip",
store_id=SIMCORE_LOCATION,
store_name=None,
io_log_redirect_cb=mock_io_log_redirect_cb,
Expand Down
31 changes: 31 additions & 0 deletions services/director-v2/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,26 @@
}
}
},
"LegacyState": {
"properties": {
"old_state_path": {
"type": "string",
"format": "path",
"title": "Old State Path"
},
"new_state_path": {
"type": "string",
"format": "path",
"title": "New State Path"
}
},
"type": "object",
"required": [
"old_state_path",
"new_state_path"
],
"title": "LegacyState"
},
"NATRule": {
"properties": {
"hostname": {
Expand Down Expand Up @@ -2449,6 +2469,17 @@
],
"title": "Volume Size Limits",
"description": "Apply volume size limits to entries in: `inputs_path`, `outputs_path` and `state_paths`. Limits must be parsable by Pydantic's ByteSize."
},
"legacy_state": {
"anyOf": [
{
"$ref": "#/components/schemas/LegacyState"
},
{
"type": "null"
}
],
"description": "if present, the service needs to first try to download the legacy statecoming from a different path."
}
},
"additionalProperties": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ def _get_environment_variables(
"DY_SIDECAR_USER_SERVICES_HAVE_INTERNET_ACCESS": f"{allow_internet_access}",
"DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": f"{telemetry_enabled}",
"DY_SIDECAR_STATE_EXCLUDE": json_dumps(f"{x}" for x in state_exclude),
"DY_SIDECAR_LEGACY_STATE": (
"null"
if scheduler_data.paths_mapping.legacy_state is None
else scheduler_data.paths_mapping.legacy_state.model_dump_json()
),
"DY_SIDECAR_CALLBACKS_MAPPING": callbacks_mapping.model_dump_json(),
"DY_SIDECAR_STATE_PATHS": json_dumps(
f"{x}" for x in scheduler_data.paths_mapping.state_paths
Expand Down Expand Up @@ -451,18 +456,18 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa:
scheduler_data.product_name is not None
), "ONLY for legacy. This function should not be called with product_name==None" # nosec

standard_simcore_docker_labels: dict[
DockerLabelKey, str
] = StandardSimcoreDockerLabels(
user_id=scheduler_data.user_id,
project_id=scheduler_data.project_id,
node_id=scheduler_data.node_uuid,
product_name=scheduler_data.product_name,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
memory_limit=ByteSize(0), # this should get overwritten
cpu_limit=0, # this should get overwritten
).to_simcore_runtime_docker_labels()
standard_simcore_docker_labels: dict[DockerLabelKey, str] = (
StandardSimcoreDockerLabels(
user_id=scheduler_data.user_id,
project_id=scheduler_data.project_id,
node_id=scheduler_data.node_uuid,
product_name=scheduler_data.product_name,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME,
memory_limit=ByteSize(0), # this should get overwritten
cpu_limit=0, # this should get overwritten
).to_simcore_runtime_docker_labels()
)

service_labels: dict[str, str] = (
{
Expand Down Expand Up @@ -494,9 +499,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa:
)
)

placement_substitutions: dict[
str, DockerPlacementConstraint
] = (
placement_substitutions: dict[str, DockerPlacementConstraint] = (
placement_settings.DIRECTOR_V2_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS
)
for image_resources in scheduler_data.service_resources.values():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"DY_DOCKER_HUB_REGISTRY_SETTINGS",
"DY_SIDECAR_AWS_S3_CLI_SETTINGS",
"DY_SIDECAR_CALLBACKS_MAPPING",
"DY_SIDECAR_LEGACY_STATE",
"DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED",
"DY_SIDECAR_NODE_ID",
"DY_SIDECAR_PATH_INPUTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def expected_dynamic_sidecar_spec(
"DY_SIDECAR_SERVICE_VERSION": "2.4.5",
"DY_SIDECAR_PRODUCT_NAME": osparc_product_name,
"DY_SIDECAR_USER_PREFERENCES_PATH": "None",
"DY_SIDECAR_LEGACY_STATE": "null",
"DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED": "True",
"POSTGRES_DB": "test",
"POSTGRES_HOST": "localhost",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.service_settings_labels import LegacyState
from models_library.services import DynamicServiceKey, ServiceRunID, ServiceVersion
from models_library.users import UserID
from pydantic import (
Expand Down Expand Up @@ -141,6 +142,10 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
DY_SIDECAR_STATE_EXCLUDE: set[str] = Field(
..., description="list of patterns to exclude files when saving states"
)
DY_SIDECAR_LEGACY_STATE: LegacyState | None = Field(
default=None, description="used to recover state when upgrading service"
)

DY_SIDECAR_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field(
default=False,
validation_alias=AliasChoices(
Expand Down
Loading
Loading