Skip to content

Commit a688579

Browse files
author
Andrei Neagu
committed
merge branch 'pr-osparc-long-running-refactor-4' of github.com:GitHK/osparc-simcore-forked into pr-osparc-long-running-refactor-4
2 parents 066db30 + ee599ab commit a688579

File tree

103 files changed

+2680
-490
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

103 files changed

+2680
-490
lines changed

.env-devel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ WEBSERVER_PROJECTS={}
395395
WEBSERVER_PROMETHEUS_API_VERSION=v1
396396
WEBSERVER_PROMETHEUS_URL=http://prometheus:9090
397397
WEBSERVER_PUBLICATIONS=1
398+
WEBSERVER_REALTIME_COLLABORATION='{"RTC_MAX_NUMBER_OF_USERS":3}'
398399
WEBSERVER_SCICRUNCH={}
399400
WEBSERVER_SESSION_SECRET_KEY='REPLACE_ME_with_result__Fernet_generate_key='
400401
WEBSERVER_SOCKETIO=1

packages/models-library/src/models_library/api_schemas_directorv2/computations.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from ..basic_types import IDStr
1515
from ..projects import ProjectID
16-
from ..projects_nodes_io import NodeID
16+
from ..projects_nodes_io import NodeID, SimcoreS3FileID
1717
from ..projects_pipeline import ComputationTask
1818
from ..users import UserID
1919
from ..wallets import WalletInfo
@@ -126,6 +126,30 @@ class TaskLogFileGet(BaseModel):
126126
] = None
127127

128128

129+
class TaskLogFileIdGet(BaseModel):
130+
task_id: NodeID
131+
file_id: SimcoreS3FileID | None
132+
133+
model_config = ConfigDict(
134+
json_schema_extra={
135+
"examples": [
136+
{
137+
"task_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
138+
"file_id": "1c46752c-b096-11ea-a3c4-02420a00392e/3fa85f64-5717-4562-b3fc-2c963f66afa6/logs/task_logs.txt",
139+
},
140+
{
141+
"task_id": "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
142+
"file_id": "1c46752c-b096-11ea-a3c4-02420a00392e/6ba7b810-9dad-11d1-80b4-00c04fd430c8/logs/debug.log",
143+
},
144+
{
145+
"task_id": "6ba7b811-9dad-11d1-80b4-00c04fd430c8",
146+
"file_id": None,
147+
},
148+
]
149+
}
150+
)
151+
152+
129153
class TasksSelection(BaseModel):
130154
nodes_ids: list[NodeID]
131155

packages/models-library/src/models_library/api_schemas_rpc_async_jobs/async_jobs.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ class AsyncJobResult(BaseModel):
3030

3131

3232
class AsyncJobGet(BaseModel):
33+
model_config = ConfigDict(
34+
json_schema_extra={
35+
"examples": [
36+
{
37+
"job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
38+
"job_name": "export_data_task",
39+
}
40+
]
41+
}
42+
)
43+
3344
job_id: AsyncJobId
3445
job_name: AsyncJobName
3546

@@ -42,6 +53,18 @@ class AsyncJobAbort(BaseModel):
4253
class AsyncJobFilter(AsyncJobFilterBase):
4354
"""Data for controlling access to an async job"""
4455

56+
model_config = ConfigDict(
57+
json_schema_extra={
58+
"examples": [
59+
{
60+
"product_name": "osparc",
61+
"user_id": 123,
62+
"client_name": "web_client",
63+
}
64+
]
65+
},
66+
)
67+
4568
product_name: ProductName
4669
user_id: UserID
4770
client_name: Annotated[

packages/postgres-database/docker/Dockerfile

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,5 @@ COPY entrypoint.bash /home/entrypoint.bash
5555

5656
RUN chmod +x /home/entrypoint.bash
5757

58-
ENV POSTGRES_USER=scu \
59-
POSTGRES_PASSWORD=adminadmin \
60-
POSTGRES_HOST=postgres \
61-
POSTGRES_PORT=5432 \
62-
POSTGRES_DB=simcoredb
63-
6458
ENTRYPOINT [ "/bin/bash", "/home/entrypoint.bash" ]
6559
CMD [ "sc-pg", "upgrade" ]
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# pylint: disable=no-self-use
2+
# pylint: disable=not-context-manager
3+
# pylint: disable=protected-access
4+
# pylint: disable=redefined-outer-name
5+
# pylint: disable=unused-argument
6+
# pylint: disable=unused-variable
7+
8+
9+
from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet
10+
from models_library.projects import ProjectID
11+
from pydantic import TypeAdapter, validate_call
12+
from pytest_mock import MockType
13+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
14+
15+
16+
class DirectorV2SideEffects:
17+
# pylint: disable=no-self-use
18+
@validate_call(config={"arbitrary_types_allowed": True})
19+
async def get_computation_task_log_file_ids(
20+
self,
21+
rpc_client: RabbitMQRPCClient | MockType,
22+
*,
23+
project_id: ProjectID,
24+
) -> list[TaskLogFileIdGet]:
25+
assert rpc_client
26+
assert project_id
27+
28+
return TypeAdapter(list[TaskLogFileIdGet]).validate_python(
29+
TaskLogFileIdGet.model_json_schema()["examples"],
30+
)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# pylint: disable=no-self-use
2+
# pylint: disable=not-context-manager
3+
# pylint: disable=protected-access
4+
# pylint: disable=redefined-outer-name
5+
# pylint: disable=unused-argument
6+
# pylint: disable=unused-variable
7+
8+
9+
from typing import Literal
10+
11+
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
12+
AsyncJobFilter,
13+
AsyncJobGet,
14+
)
15+
from models_library.api_schemas_webserver.storage import PathToExport
16+
from models_library.products import ProductName
17+
from models_library.users import UserID
18+
from pydantic import TypeAdapter, validate_call
19+
from pytest_mock import MockType
20+
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
21+
22+
23+
class StorageSideEffects:
24+
# pylint: disable=no-self-use
25+
@validate_call(config={"arbitrary_types_allowed": True})
26+
async def start_export_data(
27+
self,
28+
rabbitmq_rpc_client: RabbitMQRPCClient | MockType,
29+
*,
30+
user_id: UserID,
31+
product_name: ProductName,
32+
paths_to_export: list[PathToExport],
33+
export_as: Literal["path", "download_link"],
34+
) -> tuple[AsyncJobGet, AsyncJobFilter]:
35+
assert rabbitmq_rpc_client
36+
assert user_id
37+
assert product_name
38+
assert paths_to_export
39+
assert export_as
40+
41+
async_job_get = TypeAdapter(AsyncJobGet).validate_python(
42+
AsyncJobGet.model_json_schema()["examples"][0],
43+
)
44+
async_job_filter = TypeAdapter(AsyncJobFilter).validate_python(
45+
AsyncJobFilter.model_json_schema()["examples"][0],
46+
)
47+
48+
return async_job_get, async_job_filter

packages/service-library/src/servicelib/deferred_tasks/_deferred_manager.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88

99
import arrow
1010
from faststream.exceptions import NackMessage, RejectMessage
11-
from faststream.rabbit import ExchangeType, RabbitBroker, RabbitExchange, RabbitRouter
11+
from faststream.rabbit import (
12+
ExchangeType,
13+
RabbitBroker,
14+
RabbitExchange,
15+
RabbitQueue,
16+
RabbitRouter,
17+
)
1218
from pydantic import NonNegativeInt
1319
from servicelib.logging_utils import log_catch, log_context
1420
from servicelib.redis import RedisClientSDK
@@ -149,10 +155,14 @@ def __init__(
149155
self._global_resources_prefix = f"{calling_module_name}"
150156

151157
self.common_exchange = RabbitExchange(
152-
f"{self._global_resources_prefix}_common", type=ExchangeType.DIRECT
158+
f"{self._global_resources_prefix}_common",
159+
durable=True,
160+
type=ExchangeType.DIRECT,
153161
)
154162
self.cancellation_exchange = RabbitExchange(
155-
f"{self._global_resources_prefix}_cancellation", type=ExchangeType.FANOUT
163+
f"{self._global_resources_prefix}_cancellation",
164+
durable=True,
165+
type=ExchangeType.FANOUT,
156166
)
157167

158168
def patch_based_deferred_handlers(self) -> None:
@@ -243,8 +253,10 @@ def un_patch_base_deferred_handlers(cls) -> None:
243253
subclass.is_present.original_is_present # type: ignore
244254
)
245255

246-
def _get_global_queue_name(self, queue_name: _FastStreamRabbitQueue) -> str:
247-
return f"{self._global_resources_prefix}_{queue_name}"
256+
def _get_global_queue(self, queue_name: _FastStreamRabbitQueue) -> RabbitQueue:
257+
return RabbitQueue(
258+
f"{self._global_resources_prefix}_{queue_name}", durable=True
259+
)
248260

249261
def __get_subclass(
250262
self, class_unique_reference: ClassUniqueReference
@@ -259,7 +271,7 @@ async def __publish_to_queue(
259271
) -> None:
260272
await self.broker.publish(
261273
task_uid,
262-
queue=self._get_global_queue_name(queue),
274+
queue=self._get_global_queue(queue),
263275
exchange=(
264276
self.cancellation_exchange
265277
if queue == _FastStreamRabbitQueue.MANUALLY_CANCELLED
@@ -569,47 +581,43 @@ def _register_subscribers(self) -> None:
569581
# pylint:disable=unexpected-keyword-arg
570582
# pylint:disable=no-value-for-parameter
571583
self._fs_handle_scheduled = self.router.subscriber(
572-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SCHEDULED),
584+
queue=self._get_global_queue(_FastStreamRabbitQueue.SCHEDULED),
573585
exchange=self.common_exchange,
574586
retry=True,
575587
)(self._fs_handle_scheduled)
576588

577589
self._fs_handle_submit_task = self.router.subscriber(
578-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.SUBMIT_TASK),
590+
queue=self._get_global_queue(_FastStreamRabbitQueue.SUBMIT_TASK),
579591
exchange=self.common_exchange,
580592
retry=True,
581593
)(self._fs_handle_submit_task)
582594

583595
self._fs_handle_worker = self.router.subscriber(
584-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.WORKER),
596+
queue=self._get_global_queue(_FastStreamRabbitQueue.WORKER),
585597
exchange=self.common_exchange,
586598
retry=True,
587599
)(self._fs_handle_worker)
588600

589601
self._fs_handle_error_result = self.router.subscriber(
590-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.ERROR_RESULT),
602+
queue=self._get_global_queue(_FastStreamRabbitQueue.ERROR_RESULT),
591603
exchange=self.common_exchange,
592604
retry=True,
593605
)(self._fs_handle_error_result)
594606

595607
self._fs_handle_finished_with_error = self.router.subscriber(
596-
queue=self._get_global_queue_name(
597-
_FastStreamRabbitQueue.FINISHED_WITH_ERROR
598-
),
608+
queue=self._get_global_queue(_FastStreamRabbitQueue.FINISHED_WITH_ERROR),
599609
exchange=self.common_exchange,
600610
retry=True,
601611
)(self._fs_handle_finished_with_error)
602612

603613
self._fs_handle_deferred_result = self.router.subscriber(
604-
queue=self._get_global_queue_name(_FastStreamRabbitQueue.DEFERRED_RESULT),
614+
queue=self._get_global_queue(_FastStreamRabbitQueue.DEFERRED_RESULT),
605615
exchange=self.common_exchange,
606616
retry=True,
607617
)(self._fs_handle_deferred_result)
608618

609619
self._fs_handle_manually_cancelled = self.router.subscriber(
610-
queue=self._get_global_queue_name(
611-
_FastStreamRabbitQueue.MANUALLY_CANCELLED
612-
),
620+
queue=self._get_global_queue(_FastStreamRabbitQueue.MANUALLY_CANCELLED),
613621
exchange=self.cancellation_exchange,
614622
retry=True,
615623
)(self._fs_handle_manually_cancelled)

packages/service-library/src/servicelib/rabbitmq/_client_rpc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def _rpc_initialize(self) -> None:
4949
self._channel = await self._connection.channel()
5050

5151
self._rpc = aio_pika.patterns.RPC(self._channel)
52-
await self._rpc.initialize()
52+
await self._rpc.initialize(durable=True, auto_delete=True)
5353

5454
async def close(self) -> None:
5555
with log_context(
@@ -134,6 +134,7 @@ async def register_handler(
134134
RPCNamespacedMethodName.from_namespace_and_method(namespace, method_name),
135135
handler,
136136
auto_delete=True,
137+
durable=True,
137138
)
138139

139140
async def register_router(

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ async def declare_queue(
8585
# NOTE: setting a name will ensure multiple instance will take their data here
8686
queue_parameters |= {"name": queue_name}
8787

88+
# avoids deprecated `transient_nonexcl_queues` warning in RabbitMQ
89+
if (
90+
queue_parameters.get("durable", False) is False
91+
and queue_parameters.get("exclusive", False) is False
92+
):
93+
msg = (
94+
"Queue must be `durable` or `exclusive`, but not both. "
95+
"This is to avoid the `transient_nonexcl_queues` warning. "
96+
"NOTE: if both `durable` and `exclusive` are missing they are considered False. "
97+
f"{queue_parameters=}"
98+
)
99+
raise ValueError(msg)
100+
88101
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
89102
# most likely someone changed the signature of the queues (parameters etc...)
90103
# Safest way to deal with it:
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# pylint: disable=too-many-arguments
2+
import logging
3+
from typing import Final
4+
5+
from models_library.api_schemas_directorv2 import (
6+
DIRECTOR_V2_RPC_NAMESPACE,
7+
)
8+
from models_library.api_schemas_directorv2.computations import TaskLogFileIdGet
9+
from models_library.projects import ProjectID
10+
from models_library.rabbitmq_basic_types import RPCMethodName
11+
from pydantic import TypeAdapter
12+
13+
from ....logging_utils import log_decorator
14+
from ... import RabbitMQRPCClient
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
_RPC_METHOD_NAME_ADAPTER: TypeAdapter[RPCMethodName] = TypeAdapter(RPCMethodName)
20+
21+
_GET_COMPUTATION_TASK_LOG_FILE_IDS: Final[RPCMethodName] = (
22+
_RPC_METHOD_NAME_ADAPTER.validate_python("get_computation_task_log_file_ids")
23+
)
24+
25+
26+
@log_decorator(_logger, level=logging.DEBUG)
27+
async def get_computation_task_log_file_ids(
28+
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
29+
) -> list[TaskLogFileIdGet]:
30+
"""
31+
Raises:
32+
ComputationalTaskMissingError
33+
"""
34+
result = await rabbitmq_rpc_client.request(
35+
DIRECTOR_V2_RPC_NAMESPACE,
36+
_GET_COMPUTATION_TASK_LOG_FILE_IDS,
37+
project_id=project_id,
38+
)
39+
assert isinstance(result, list) # nosec
40+
assert all(isinstance(item, TaskLogFileIdGet) for item in result) # nosec
41+
return result

0 commit comments

Comments
 (0)