Skip to content

Commit 4d27c32

Browse files
matusdrobuliak66mrnicegyu11
authored andcommitted
🎨 EFS Guardian: adding size monitoring (ITISFoundation#6502)
1 parent 9c8d0ec commit 4d27c32

File tree

17 files changed

+605
-27
lines changed

17 files changed

+605
-27
lines changed

‎packages/models-library/src/models_library/rabbitmq_messages.py‎

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ def routing_key(self) -> str | None:
194194
return None
195195

196196

197+
class DynamicServiceRunningMessage(RabbitMessageBase):
198+
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
199+
default="io.simcore.service.dynamic-service-running", const=True
200+
)
201+
202+
project_id: ProjectID
203+
node_id: NodeID
204+
user_id: UserID
205+
product_name: ProductName | None
206+
created_at: datetime.datetime = Field(
207+
default_factory=lambda: arrow.utcnow().datetime,
208+
description="message creation datetime",
209+
)
210+
211+
def routing_key(self) -> str | None:
212+
return None
213+
214+
197215
class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
198216
message_type: RabbitResourceTrackingMessageType = Field(
199217
default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True

‎services/docker-compose.yml‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,11 @@ services:
408408
RABBIT_PORT: ${RABBIT_PORT}
409409
RABBIT_SECURE: ${RABBIT_SECURE}
410410
RABBIT_USER: ${RABBIT_USER}
411+
REDIS_HOST: ${REDIS_HOST}
412+
REDIS_PASSWORD: ${REDIS_PASSWORD}
413+
REDIS_PORT: ${REDIS_PORT}
414+
REDIS_SECURE: ${REDIS_SECURE}
415+
REDIS_USER: ${REDIS_USER}
411416
SC_USER_ID: ${SC_USER_ID}
412417
SC_USER_NAME: ${SC_USER_NAME}
413418
EFS_USER_ID: ${EFS_USER_ID}

‎services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from fastapi import FastAPI
66
from models_library.progress_bar import ProgressReport
77
from models_library.rabbitmq_messages import (
8+
DynamicServiceRunningMessage,
89
EventRabbitMessage,
910
LoggerRabbitMessage,
1011
ProgressRabbitMessageNode,
@@ -34,6 +35,12 @@ async def post_resource_tracking_message(
3435
await _post_rabbit_message(app, message)
3536

3637

38+
async def post_dynamic_service_running_message(
39+
app: FastAPI, message: DynamicServiceRunningMessage
40+
):
41+
await _post_rabbit_message(app, message)
42+
43+
3744
async def post_log_message(
3845
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
3946
) -> None:

‎services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py‎

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
import logging
23
from typing import Final
34

45
from fastapi import FastAPI
56
from models_library.generated_models.docker_rest_api import ContainerState
67
from models_library.rabbitmq_messages import (
8+
DynamicServiceRunningMessage,
79
RabbitResourceTrackingHeartbeatMessage,
810
RabbitResourceTrackingStartedMessage,
911
RabbitResourceTrackingStoppedMessage,
@@ -19,7 +21,10 @@
1921
are_all_containers_in_expected_states,
2022
get_container_states,
2123
)
22-
from ...core.rabbitmq import post_resource_tracking_message
24+
from ...core.rabbitmq import (
25+
post_dynamic_service_running_message,
26+
post_resource_tracking_message,
27+
)
2328
from ...core.settings import ApplicationSettings, ResourceTrackingSettings
2429
from ...models.shared_store import SharedStore
2530
from ._models import ResourceTrackingState
@@ -70,10 +75,21 @@ async def _heart_beat_task(app: FastAPI):
7075
)
7176

7277
if are_all_containers_in_expected_states(container_states.values()):
73-
message = RabbitResourceTrackingHeartbeatMessage(
78+
rut_message = RabbitResourceTrackingHeartbeatMessage(
7479
service_run_id=settings.DY_SIDECAR_RUN_ID
7580
)
76-
await post_resource_tracking_message(app, message)
81+
dyn_message = DynamicServiceRunningMessage(
82+
project_id=settings.DY_SIDECAR_PROJECT_ID,
83+
node_id=settings.DY_SIDECAR_NODE_ID,
84+
user_id=settings.DY_SIDECAR_USER_ID,
85+
product_name=settings.DY_SIDECAR_PRODUCT_NAME,
86+
)
87+
await asyncio.gather(
88+
*[
89+
post_resource_tracking_message(app, rut_message),
90+
post_dynamic_service_running_message(app, dyn_message),
91+
]
92+
)
7793
else:
7894
_logger.info(
7995
"heart beat message skipped: container_states=%s", container_states

‎services/efs-guardian/src/simcore_service_efs_guardian/core/application.py‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
)
1414
from ..api.rest.routes import setup_api_routes
1515
from ..api.rpc.routes import setup_rpc_routes
16+
from ..services.background_tasks_setup import setup as setup_background_tasks
1617
from ..services.efs_manager_setup import setup as setup_efs_manager
1718
from ..services.modules.rabbitmq import setup as setup_rabbitmq
19+
from ..services.modules.redis import setup as setup_redis
20+
from ..services.process_messages_setup import setup as setup_process_messages
1821
from .settings import ApplicationSettings
1922

2023
logger = logging.getLogger(__name__)
@@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
4043

4144
# PLUGINS SETUP
4245
setup_rabbitmq(app)
46+
setup_redis(app)
4347

4448
setup_api_routes(app)
4549
setup_rpc_routes(app)
4650

4751
setup_efs_manager(app)
52+
setup_background_tasks(app)
53+
setup_process_messages(app)
4854

4955
# EVENTS
5056
async def _on_startup() -> None:

‎services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
LogLevel,
99
VersionTag,
1010
)
11-
from pydantic import Field, PositiveInt, validator
11+
from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator
1212
from settings_library.base import BaseCustomSettings
1313
from settings_library.efs import AwsEfsSettings
1414
from settings_library.rabbit import RabbitSettings
15+
from settings_library.redis import RedisSettings
1516
from settings_library.tracing import TracingSettings
1617
from settings_library.utils_logging import MixinLoggingSettings
1718

@@ -57,6 +58,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
5758
EFS_GROUP_NAME: str = Field(
5859
description="Linux group name that the EFS and Simcore linux users are part of"
5960
)
61+
EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field(
62+
default=parse_obj_as(ByteSize, "500GiB")
63+
)
6064

6165
# RUNTIME -----------------------------------------------------------
6266
EFS_GUARDIAN_DEBUG: bool = Field(
@@ -76,6 +80,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
7680

7781
EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True)
7882
EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True)
83+
EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True)
7984
EFS_GUARDIAN_TRACING: TracingSettings | None = Field(
8085
auto_default_from_env=True, description="settings for opentelemetry tracing"
8186
)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
3+
from fastapi import FastAPI
4+
5+
from ..core.settings import ApplicationSettings
6+
7+
_logger = logging.getLogger(__name__)
8+
9+
10+
async def removal_policy_task(app: FastAPI) -> None:
11+
_logger.info("FAKE Removal policy task started (not yet implemented)")
12+
13+
# After X days of inactivity remove data from EFS
14+
# Probably use `last_modified_data` in the project DB table
15+
# Maybe lock project during this time lock_project()
16+
17+
app_settings: ApplicationSettings = app.state.settings
18+
assert app_settings # nosec
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import asyncio
2+
import logging
3+
from collections.abc import Awaitable, Callable
4+
from datetime import timedelta
5+
from typing import TypedDict
6+
7+
from fastapi import FastAPI
8+
from servicelib.background_task import stop_periodic_task
9+
from servicelib.logging_utils import log_catch, log_context
10+
from servicelib.redis_utils import start_exclusive_periodic_task
11+
12+
from .background_tasks import removal_policy_task
13+
from .modules.redis import get_redis_lock_client
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
18+
class EfsGuardianBackgroundTask(TypedDict):
19+
name: str
20+
task_func: Callable
21+
22+
23+
_EFS_GUARDIAN_BACKGROUND_TASKS = [
24+
EfsGuardianBackgroundTask(
25+
name="efs_removal_policy_task", task_func=removal_policy_task
26+
)
27+
]
28+
29+
30+
def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
31+
async def _startup() -> None:
32+
with log_context(
33+
_logger, logging.INFO, msg="Efs Guardian startup.."
34+
), log_catch(_logger, reraise=False):
35+
app.state.efs_guardian_background_tasks = []
36+
37+
# Setup periodic tasks
38+
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
39+
exclusive_task = start_exclusive_periodic_task(
40+
get_redis_lock_client(app),
41+
task["task_func"],
42+
task_period=timedelta(seconds=60), # 1 minute
43+
retry_after=timedelta(seconds=300), # 5 minutes
44+
task_name=task["name"],
45+
app=app,
46+
)
47+
app.state.efs_guardian_background_tasks.append(exclusive_task)
48+
49+
return _startup
50+
51+
52+
def _on_app_shutdown(
53+
_app: FastAPI,
54+
) -> Callable[[], Awaitable[None]]:
55+
async def _stop() -> None:
56+
with log_context(
57+
_logger, logging.INFO, msg="Efs Guardian shutdown.."
58+
), log_catch(_logger, reraise=False):
59+
assert _app # nosec
60+
if _app.state.efs_guardian_background_tasks:
61+
await asyncio.gather(
62+
*[
63+
stop_periodic_task(task)
64+
for task in _app.state.efs_guardian_background_tasks
65+
]
66+
)
67+
68+
return _stop
69+
70+
71+
def setup(app: FastAPI) -> None:
72+
app.add_event_handler("startup", _on_app_startup(app))
73+
app.add_event_handler("shutdown", _on_app_shutdown(app))

‎services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py‎

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
from fastapi import FastAPI
66
from models_library.projects import ProjectID
77
from models_library.projects_nodes_io import NodeID
8+
from pydantic import ByteSize
89

910
from ..core.settings import ApplicationSettings, get_application_settings
11+
from . import efs_manager_utils
1012

1113

1214
@dataclass(frozen=True)
@@ -52,3 +54,39 @@ async def create_project_specific_data_dir(
5254
_dir_path, 0o770
5355
) # This gives rwx permissions to user and group, and nothing to others
5456
return _dir_path
57+
58+
async def check_project_node_data_directory_exits(
59+
self, project_id: ProjectID, node_id: NodeID
60+
) -> bool:
61+
_dir_path = (
62+
self._efs_mounted_path
63+
/ self._project_specific_data_base_directory
64+
/ f"{project_id}"
65+
/ f"{node_id}"
66+
)
67+
68+
return _dir_path.exists()
69+
70+
async def get_project_node_data_size(
71+
self, project_id: ProjectID, node_id: NodeID
72+
) -> ByteSize:
73+
_dir_path = (
74+
self._efs_mounted_path
75+
/ self._project_specific_data_base_directory
76+
/ f"{project_id}"
77+
/ f"{node_id}"
78+
)
79+
80+
return await efs_manager_utils.get_size_bash_async(_dir_path)
81+
82+
async def remove_project_node_data_write_permissions(
83+
self, project_id: ProjectID, node_id: NodeID
84+
) -> None:
85+
_dir_path = (
86+
self._efs_mounted_path
87+
/ self._project_specific_data_base_directory
88+
/ f"{project_id}"
89+
/ f"{node_id}"
90+
)
91+
92+
await efs_manager_utils.remove_write_permissions_bash_async(_dir_path)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import logging
3+
4+
from pydantic import ByteSize
5+
6+
_logger = logging.getLogger(__name__)
7+
8+
9+
async def get_size_bash_async(path) -> ByteSize:
10+
# Create the subprocess
11+
command = ["du", "-sb", path]
12+
process = await asyncio.create_subprocess_exec(
13+
*command,
14+
stdout=asyncio.subprocess.PIPE,
15+
stderr=asyncio.subprocess.PIPE,
16+
)
17+
18+
# Wait for the subprocess to complete
19+
stdout, stderr = await process.communicate()
20+
21+
if process.returncode == 0:
22+
# Parse the output
23+
size = ByteSize(stdout.decode().split()[0])
24+
return size
25+
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
26+
_logger.error(msg)
27+
raise RuntimeError(msg)
28+
29+
30+
async def remove_write_permissions_bash_async(path) -> None:
31+
# Create the subprocess
32+
command = ["chmod", "-R", "a-w", path]
33+
process = await asyncio.create_subprocess_exec(
34+
*command,
35+
stdout=asyncio.subprocess.PIPE,
36+
stderr=asyncio.subprocess.PIPE,
37+
)
38+
39+
# Wait for the subprocess to complete
40+
_, stderr = await process.communicate()
41+
42+
if process.returncode == 0:
43+
return
44+
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
45+
_logger.error(msg)
46+
raise RuntimeError(msg)

0 commit comments

Comments
 (0)