Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
220b7b6
extracting notifications as seprate module
Sep 19, 2024
b419e64
refactor internals
Sep 19, 2024
71678d6
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 19, 2024
d1c838e
rename module
Sep 19, 2024
fc88f8d
extend notifications interface
Sep 19, 2024
6bd6d15
added notifications tests
Sep 19, 2024
249dda7
refactor to work as expected
Sep 19, 2024
0c2153c
added missing project_id to notification
Sep 19, 2024
4e22b75
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 19, 2024
4ed8e2a
adding missing port notifier to constructor
Sep 19, 2024
d9950fa
refactor broken tests
Sep 19, 2024
796e1b5
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 20, 2024
9973d68
fixed tests
Sep 20, 2024
9f2854f
using str auto enum
Sep 20, 2024
d0a7698
fixed relative imports
Sep 23, 2024
c4fe846
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 23, 2024
525bce1
refactor using function
Sep 23, 2024
bf7408a
using limited_gather
Sep 23, 2024
0085f4b
outputs_callback is now totally optional
Sep 23, 2024
05ed52e
added tests
Sep 23, 2024
5d7d1a3
refactor tests
Sep 23, 2024
cb2662c
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 23, 2024
4136cde
fix flaky test
Sep 23, 2024
ab3ace9
pylint
Sep 23, 2024
d4d5fd9
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 24, 2024
82b4701
revert changes
Sep 24, 2024
8cb8520
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 24, 2024
4c32ec0
restore flaky marker
Sep 24, 2024
12e72b7
Merge branch 'pr-osparc-port-change-notifications' of github.com:GitH…
Sep 24, 2024
78771ae
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 24, 2024
886b151
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from enum import auto

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.utils.enums import StrAutoEnum
from pydantic import BaseModel


class OutputStatus(StrAutoEnum):
UPLOAD_STARTED = auto()
UPLOAD_WAS_ABORTED = auto()
UPLOAD_FINISHED_SUCCESSFULLY = auto()
UPLOAD_FINISHED_WITH_ERRROR = auto()


class InputStatus(StrAutoEnum):
DOWNLOAD_STARTED = auto()
DOWNLOAD_WAS_ABORTED = auto()
DOWNLOAD_FINISHED_SUCCESSFULLY = auto()
DOWNLOAD_FINISHED_WITH_ERRROR = auto()


class _PortStatusCommon(BaseModel):
project_id: ProjectID
node_id: NodeID
port_key: ServicePortKey


class OutputPortStatus(_PortStatusCommon):
status: OutputStatus


class InputPortSatus(_PortStatusCommon):
status: InputStatus
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Final

SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage"
SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts"
SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts"
63 changes: 48 additions & 15 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from abc import ABC, abstractmethod
from asyncio import CancelledError
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -27,6 +29,20 @@
log = logging.getLogger(__name__)


class OutputsCallbacks(ABC):
@abstractmethod
async def aborted(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_with_error(self, key: ServicePortKey) -> None:
pass


class Nodeports(BaseModel):
"""
Represents a node in a project and all its input/output ports
Expand Down Expand Up @@ -148,6 +164,7 @@ async def set_multiple(
],
*,
progress_bar: ProgressBarData,
outputs_callbacks: OutputsCallbacks | None,
) -> None:
"""
Sets the provided values to the respective input or output ports
Expand All @@ -156,26 +173,42 @@ async def set_multiple(

raises ValidationError
"""

async def _set_with_notifications(
port_key: ServicePortKey,
value: ItemConcreteValue | None,
set_kwargs: SetKWargs | None,
sub_progress: ProgressBarData,
) -> None:
assert outputs_callbacks is not None # nosec
try:
# pylint: disable=protected-access
await self.internal_outputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
await outputs_callbacks.finished_succesfully(port_key)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
# pylint: disable=protected-access
await self.internal_inputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
except CancelledError:
await outputs_callbacks.aborted(port_key)
raise
except Exception:
await outputs_callbacks.finished_with_error(port_key)
raise

tasks = []
async with progress_bar.sub_progress(
steps=len(port_values.items()), description=IDStr("set multiple")
) as sub_progress:
for port_key, (value, set_kwargs) in port_values.items():
# pylint: disable=protected-access
try:
tasks.append(
self.internal_outputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
tasks.append(
self.internal_inputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
tasks.append(
_set_with_notifications(port_key, value, set_kwargs, sub_progress)
)

results = await logged_gather(*tasks)
await self.save_to_db_cb(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections.abc import Awaitable, Callable, Iterable
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock
from uuid import uuid4

import np_helpers
Expand Down Expand Up @@ -777,6 +778,7 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.outputs).values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
# pylint: disable=protected-access
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
Expand All @@ -786,6 +788,7 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.inputs).values(), start=1000)
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001

Expand All @@ -807,4 +810,5 @@ async def test_batch_update_inputs_outputs(
await PORTS.set_multiple(
{ServicePortKey("missing_key_in_both"): (123132, None)},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pathlib import Path
from typing import Any, Callable
from unittest.mock import AsyncMock

import pytest
from faker import Faker
Expand Down Expand Up @@ -138,6 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs):
+ list(original_outputs.values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ async def ports_inputs_pull_task(
request: Request,
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
app: Annotated[FastAPI, Depends(get_application)],
settings: Annotated[ApplicationSettings, Depends(get_settings)],
mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)],
inputs_state: Annotated[InputsState, Depends(get_inputs_state)],
port_keys: list[str] | None = None,
Expand All @@ -223,6 +224,7 @@ async def ports_inputs_pull_task(
port_keys=port_keys,
mounted_volumes=mounted_volumes,
app=app,
settings=settings,
inputs_pulling_enabled=inputs_state.inputs_pulling_enabled,
)
except TaskAlreadyRunningError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..modules.attribute_monitor import setup_attribute_monitor
from ..modules.inputs import setup_inputs
from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs
from ..modules.notifications import setup_notifications
from ..modules.outputs import setup_outputs
from ..modules.prometheus_metrics import setup_prometheus_metrics
from ..modules.resource_tracking import setup_resource_tracking
Expand Down Expand Up @@ -172,6 +173,7 @@ def create_app():
setup_rabbitmq(app)
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
setup_system_monitor(app)

setup_mounted_fs(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from servicelib.progress_bar import ProgressBarData
from servicelib.utils import logged_gather
from simcore_sdk.node_data import data_manager
from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import (
PortNotifier,
)
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.retry import retry_if_result
Expand Down Expand Up @@ -472,6 +475,7 @@ async def task_ports_inputs_pull(
port_keys: list[str] | None,
mounted_volumes: MountedVolumes,
app: FastAPI,
settings: ApplicationSettings,
*,
inputs_pulling_enabled: bool,
) -> int:
Expand Down Expand Up @@ -505,6 +509,12 @@ async def task_ports_inputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=PortNotifier(
app,
settings.DY_SIDECAR_USER_ID,
settings.DY_SIDECAR_PROJECT_ID,
settings.DY_SIDECAR_NODE_ID,
),
)
await post_sidecar_log_message(
app, "Finished pulling inputs", log_level=logging.INFO
Expand Down Expand Up @@ -541,6 +551,7 @@ async def task_ports_outputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=None,
)
await post_sidecar_log_message(
app, "Finished pulling outputs", log_level=logging.INFO
Expand Down
Loading
Loading