Skip to content

Commit f301fbb

Browse files
daily work
1 parent 6372817 commit f301fbb

File tree

15 files changed

+344
-4
lines changed

15 files changed

+344
-4
lines changed

packages/models-library/src/models_library/rabbitmq_messages.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,23 @@ def routing_key(self) -> str | None:
194194
return None
195195

196196

197+
class DynamicServiceRunningMessage(RabbitMessageBase):
198+
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
199+
default="io.simcore.service.dynamic-service-running", const=True
200+
)
201+
202+
project_id: ProjectID
203+
node_id: NodeID
204+
product_name: ProductName | None
205+
created_at: datetime.datetime = Field(
206+
default_factory=lambda: arrow.utcnow().datetime,
207+
description="message creation datetime",
208+
)
209+
210+
def routing_key(self) -> str | None:
211+
return None
212+
213+
197214
class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
198215
message_type: RabbitResourceTrackingMessageType = Field(
199216
default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ async def subscribe(
151151
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
152152
unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S,
153153
unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS,
154+
queue_name: str | None = None,
154155
) -> str:
155156
"""subscribe to exchange_name calling ``message_handler`` for every incoming message
156157
- exclusive_queue: True means that every instance of this application will
@@ -216,6 +217,7 @@ async def subscribe(
216217
exclusive_queue=exclusive_queue,
217218
message_ttl=message_ttl,
218219
arguments={"x-dead-letter-exchange": delayed_exchange_name},
220+
queue_name=queue_name,
219221
)
220222
if topics is None:
221223
await queue.bind(exchange, routing_key="")
@@ -235,6 +237,7 @@ async def subscribe(
235237
exclusive_queue=exclusive_queue,
236238
message_ttl=int(unexpected_error_retry_delay_s * 1000),
237239
arguments={"x-dead-letter-exchange": exchange.name},
240+
queue_name=queue_name,
238241
)
239242
await delayed_queue.bind(delayed_exchange)
240243

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ async def declare_queue(
7070
exclusive_queue: bool,
7171
arguments: dict[str, Any] | None = None,
7272
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
73+
queue_name: str | None = None,
7374
) -> aio_pika.abc.AbstractRobustQueue:
7475
default_arguments = {"x-message-ttl": message_ttl}
7576
if arguments is not None:
@@ -82,7 +83,7 @@ async def declare_queue(
8283
}
8384
if not exclusive_queue:
8485
# NOTE: setting a name will ensure multiple instance will take their data here
85-
queue_parameters |= {"name": exchange_name}
86+
queue_parameters |= {"name": queue_name if queue_name else exchange_name}
8687

8788
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
8889
# most likely someone changed the signature of the queues (parameters etc...)

services/docker-compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,11 @@ services:
407407
RABBIT_PORT: ${RABBIT_PORT}
408408
RABBIT_SECURE: ${RABBIT_SECURE}
409409
RABBIT_USER: ${RABBIT_USER}
410+
REDIS_HOST: ${REDIS_HOST}
411+
REDIS_PASSWORD: ${REDIS_PASSWORD}
412+
REDIS_PORT: ${REDIS_PORT}
413+
REDIS_SECURE: ${REDIS_SECURE}
414+
REDIS_USER: ${REDIS_USER}
410415
SC_USER_ID: ${SC_USER_ID}
411416
SC_USER_NAME: ${SC_USER_NAME}
412417
EFS_USER_ID: ${EFS_USER_ID}

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import FastAPI
66
from models_library.progress_bar import ProgressReport
77
from models_library.rabbitmq_messages import (
8+
DynamicServiceRunningMessage,
89
EventRabbitMessage,
910
LoggerRabbitMessage,
1011
ProgressRabbitMessageNode,
@@ -34,6 +35,12 @@ async def post_resource_tracking_message(
3435
await _post_rabbit_message(app, message)
3536

3637

38+
async def post_dynamic_service_running_message(
39+
app: FastAPI, message: DynamicServiceRunningMessage
40+
):
41+
await _post_rabbit_message(app, message)
42+
43+
3744
async def post_log_message(
3845
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
3946
) -> None:

services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
import logging
23
from typing import Final
34

45
from fastapi import FastAPI
56
from models_library.generated_models.docker_rest_api import ContainerState
67
from models_library.rabbitmq_messages import (
8+
DynamicServiceRunningMessage,
79
RabbitResourceTrackingHeartbeatMessage,
810
RabbitResourceTrackingStartedMessage,
911
RabbitResourceTrackingStoppedMessage,
@@ -19,7 +21,10 @@
1921
are_all_containers_in_expected_states,
2022
get_container_states,
2123
)
22-
from ...core.rabbitmq import post_resource_tracking_message
24+
from ...core.rabbitmq import (
25+
post_dynamic_service_running_message,
26+
post_resource_tracking_message,
27+
)
2328
from ...core.settings import ApplicationSettings, ResourceTrackingSettings
2429
from ...models.shared_store import SharedStore
2530
from ._models import ResourceTrackingState
@@ -70,10 +75,20 @@ async def _heart_beat_task(app: FastAPI):
7075
)
7176

7277
if are_all_containers_in_expected_states(container_states.values()):
73-
message = RabbitResourceTrackingHeartbeatMessage(
78+
rut_message = RabbitResourceTrackingHeartbeatMessage(
7479
service_run_id=settings.DY_SIDECAR_RUN_ID
7580
)
76-
await post_resource_tracking_message(app, message)
81+
dyn_message = DynamicServiceRunningMessage(
82+
project_id=settings.DY_SIDECAR_PROJECT_ID,
83+
node_id=settings.DY_SIDECAR_NODE_ID,
84+
product_name=settings.DY_SIDECAR_PRODUCT_NAME,
85+
)
86+
await asyncio.gather(
87+
*[
88+
post_resource_tracking_message(app, rut_message),
89+
post_dynamic_service_running_message(app, dyn_message),
90+
]
91+
)
7792
else:
7893
_logger.info(
7994
"heart beat message skipped: container_states=%s", container_states

services/efs-guardian/src/simcore_service_efs_guardian/core/application.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
)
1414
from ..api.rest.routes import setup_api_routes
1515
from ..api.rpc.routes import setup_rpc_routes
16+
from ..services.background_tasks_setup import setup as setup_background_tasks
1617
from ..services.efs_manager_setup import setup as setup_efs_manager
1718
from ..services.modules.rabbitmq import setup as setup_rabbitmq
19+
from ..services.modules.redis import setup as setup_redis
20+
from ..services.process_messages_setup import setup as setup_process_messages
1821
from .settings import ApplicationSettings
1922

2023
logger = logging.getLogger(__name__)
@@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
4043

4144
# PLUGINS SETUP
4245
setup_rabbitmq(app)
46+
setup_redis(app)
4347

4448
setup_api_routes(app)
4549
setup_rpc_routes(app)
4650

4751
setup_efs_manager(app)
52+
setup_background_tasks(app)
53+
setup_process_messages(app)
4854

4955
# EVENTS
5056
async def _on_startup() -> None:

services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from settings_library.base import BaseCustomSettings
1313
from settings_library.efs import AwsEfsSettings
1414
from settings_library.rabbit import RabbitSettings
15+
from settings_library.redis import RedisSettings
1516
from settings_library.tracing import TracingSettings
1617
from settings_library.utils_logging import MixinLoggingSettings
1718

@@ -76,6 +77,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
7677

7778
EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True)
7879
EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True)
80+
EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True)
7981
EFS_GUARDIAN_TRACING: TracingSettings | None = Field(
8082
auto_default_from_env=True, description="settings for opentelemetry tracing"
8183
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
3+
from fastapi import FastAPI
4+
5+
from ..core.settings import ApplicationSettings
6+
7+
_logger = logging.getLogger(__name__)
8+
9+
10+
async def removal_policy_task(app: FastAPI) -> None:
11+
_logger.info("Removal policy task started")
12+
13+
# After X days of inactivity remove data from EFS
14+
# Probably use `last_modified_data` in the project DB table
15+
# Maybe lock project during this time lock_project()
16+
17+
app_settings: ApplicationSettings = app.state.settings
18+
assert app_settings # nosec
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
import logging
3+
from collections.abc import Awaitable, Callable
4+
from datetime import timedelta
5+
from typing import TypedDict
6+
7+
from fastapi import FastAPI
8+
from servicelib.background_task import stop_periodic_task
9+
from servicelib.logging_utils import log_catch, log_context
10+
from servicelib.redis_utils import start_exclusive_periodic_task
11+
12+
from .background_tasks import removal_policy_task
13+
from .modules.redis import get_redis_client
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
class EfsGuardianBackgroundTask(TypedDict):
19+
name: str
20+
task_func: Callable
21+
22+
23+
_EFS_GUARDIAN_BACKGROUND_TASKS = [
24+
EfsGuardianBackgroundTask(
25+
name="efs_removal_policy_task", task_func=removal_policy_task
26+
)
27+
]
28+
29+
30+
def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
31+
async def _startup() -> None:
32+
with log_context(
33+
_logger, logging.INFO, msg="Efs Guardian startup.."
34+
), log_catch(_logger, reraise=False):
35+
app.state.efs_guardian_background_tasks = []
36+
37+
# Setup periodic tasks
38+
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
39+
exclusive_task = start_exclusive_periodic_task(
40+
get_redis_client(app),
41+
task["task_func"],
42+
task_period=timedelta(seconds=60), # 1 minute
43+
retry_after=timedelta(seconds=60), # 5 minutes
44+
task_name=task["name"],
45+
app=app,
46+
)
47+
app.state.efs_guardian_background_tasks.append(exclusive_task)
48+
49+
return _startup
50+
51+
52+
def on_app_shutdown(
53+
_app: FastAPI,
54+
) -> Callable[[], Awaitable[None]]:
55+
async def _stop() -> None:
56+
with log_context(
57+
_logger, logging.INFO, msg="Efs Guardian shutdown.."
58+
), log_catch(_logger, reraise=False):
59+
assert _app # nosec
60+
if _app.state.efs_guardian_background_tasks:
61+
await asyncio.gather(
62+
*[
63+
stop_periodic_task(task)
64+
for task in _app.state.efs_guardian_background_tasks
65+
]
66+
)
67+
68+
return _stop
69+
70+
71+
def setup(app: FastAPI) -> None:
72+
app.add_event_handler("startup", on_app_startup(app))
73+
app.add_event_handler("shutdown", on_app_shutdown(app))

0 commit comments

Comments
 (0)