Skip to content

Commit dc1bd4f

Browse files
author
Andrei Neagu
committed
added force container cleanup step when stopping services
1 parent 0651c06 commit dc1bd4f

File tree

12 files changed

+329
-7
lines changed

12 files changed

+329
-7
lines changed

packages/models-library/src/models_library/api_schemas_directorv2/services.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,7 @@ class ServiceExtras(BaseModel):
103103

104104

105105
CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME: Final[NonNegativeInt] = 89
106+
107+
108+
DYNAMIC_SIDECAR_SERVICE_PREFIX: Final[str] = "dy-sidecar"
109+
DYNAMIC_PROXY_SERVICE_PREFIX: Final[str] = "dy-proxy"
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
from datetime import timedelta
3+
from typing import Final
4+
5+
from models_library.projects_nodes_io import NodeID
6+
from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace
7+
from pydantic import NonNegativeInt, TypeAdapter
8+
from servicelib.logging_utils import log_decorator
9+
from servicelib.rabbitmq import RabbitMQRPCClient
10+
11+
_logger = logging.getLogger(__name__)
12+
13+
_REQUEST_TIMEOUT: Final[NonNegativeInt] = int(timedelta(minutes=60).total_seconds())
14+
15+
16+
@log_decorator(_logger, level=logging.DEBUG)
17+
async def force_container_cleanup(
18+
rabbitmq_rpc_client: RabbitMQRPCClient,
19+
*,
20+
docker_node_id: str,
21+
swarm_stack_name: str,
22+
node_id: NodeID,
23+
) -> None:
24+
result = await rabbitmq_rpc_client.request(
25+
RPCNamespace.from_entries(
26+
{
27+
"service": "agent",
28+
"docker_node_id": docker_node_id,
29+
"swarm_stack_name": swarm_stack_name,
30+
}
31+
),
32+
TypeAdapter(RPCMethodName).validate_python("force_container_cleanup"),
33+
node_id=node_id,
34+
timeout_s=_REQUEST_TIMEOUT,
35+
)
36+
assert result is None # nosec
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import logging
2+
3+
from fastapi import FastAPI
4+
from models_library.projects_nodes_io import NodeID
5+
from servicelib.logging_utils import log_context
6+
from servicelib.rabbitmq import RPCRouter
7+
8+
from ...services.containers_manager import ContainersManager
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
router = RPCRouter()
13+
14+
15+
@router.expose()
16+
async def force_container_cleanup(app: FastAPI, *, node_id: NodeID) -> None:
17+
with log_context(
18+
_logger, logging.INFO, f"removing all orphan container for {node_id=}"
19+
):
20+
await ContainersManager.get_from_app_state(app).force_container_cleanup(node_id)

services/agent/src/simcore_service_agent/api/rpc/_volumes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from servicelib.rabbitmq.rpc_interfaces.agent.errors import (
88
NoServiceVolumesFoundRPCError,
99
)
10-
from simcore_service_agent.services.volumes_manager import VolumesManager
10+
11+
from ...services.volumes_manager import VolumesManager
1112

1213
_logger = logging.getLogger(__name__)
1314

services/agent/src/simcore_service_agent/api/rpc/routes.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
from simcore_service_agent.core.settings import ApplicationSettings
55

66
from ...services.rabbitmq import get_rabbitmq_rpc_server
7-
from . import _volumes
7+
from . import _containers, _volumes
88

99
ROUTERS: list[RPCRouter] = [
10+
_containers.router,
1011
_volumes.router,
1112
]
1213

services/agent/src/simcore_service_agent/core/application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919
from ..api.rest.routes import setup_rest_api
2020
from ..api.rpc.routes import setup_rpc_api_routes
21+
from ..services.containers_manager import setup_containers_manager
2122
from ..services.instrumentation import setup_instrumentation
2223
from ..services.rabbitmq import setup_rabbitmq
2324
from ..services.volumes_manager import setup_volume_manager
@@ -58,6 +59,7 @@ def create_app() -> FastAPI:
5859

5960
setup_rabbitmq(app)
6061
setup_volume_manager(app)
62+
setup_containers_manager(app)
6163
setup_rest_api(app)
6264
setup_rpc_api_routes(app)
6365

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import logging
2+
from dataclasses import dataclass, field
3+
4+
from aiodocker import Docker
5+
from fastapi import FastAPI
6+
from models_library.api_schemas_directorv2.services import (
7+
DYNAMIC_PROXY_SERVICE_PREFIX,
8+
DYNAMIC_SIDECAR_SERVICE_PREFIX,
9+
)
10+
from models_library.projects_nodes_io import NodeID
11+
from servicelib.fastapi.app_state import SingletonInAppStateMixin
12+
from servicelib.utils import limited_gather
13+
14+
from .docker_utils import get_containers_with_prefixes, remove_container_forcefully
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class ContainersManager(SingletonInAppStateMixin):
21+
app_state_name: str = "containers_manager"
22+
23+
docker: Docker = field(default_factory=Docker)
24+
25+
async def force_container_cleanup(self, node_id: NodeID) -> None:
26+
# compose all possible used container prefixes
27+
proxy_prefix = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}"
28+
dy_sidecar_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}"
29+
user_service_prefix = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}"
30+
31+
orphan_containers = await get_containers_with_prefixes(
32+
self.docker, {proxy_prefix, dy_sidecar_prefix, user_service_prefix}
33+
)
34+
35+
_logger.debug(
36+
"Orphan containers for node_id='%s': %s", node_id, orphan_containers
37+
)
38+
39+
await limited_gather(
40+
*[
41+
remove_container_forcefully(self.docker, container)
42+
for container in orphan_containers
43+
],
44+
)
45+
46+
async def shutdown(self) -> None:
47+
await self.docker.close()
48+
49+
50+
def get_containers_manager(app: FastAPI) -> ContainersManager:
51+
return ContainersManager.get_from_app_state(app)
52+
53+
54+
def setup_containers_manager(app: FastAPI) -> None:
55+
async def _on_startup() -> None:
56+
ContainersManager().set_to_app_state(app)
57+
58+
async def _on_shutdown() -> None:
59+
await ContainersManager.get_from_app_state(app).shutdown()
60+
61+
app.add_event_handler("startup", _on_startup)
62+
app.add_event_handler("shutdown", _on_shutdown)

services/agent/src/simcore_service_agent/services/docker_utils.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,27 @@ async def remove_volume(
106106
get_instrumentation(app).agent_metrics.remove_volumes(
107107
settings.AGENT_DOCKER_NODE_ID
108108
)
109+
110+
111+
async def get_containers_with_prefixes(docker: Docker, prefixes: set[str]) -> set[str]:
112+
"""Returns a set of container names matching any of the given prefixes"""
113+
all_containers = await docker.containers.list(all=True)
114+
115+
result: set[str] = set()
116+
for container in all_containers:
117+
container_info = await container.show()
118+
container_name = container_info.get("Name", "").lstrip("/")
119+
if any(container_name.startswith(prefix) for prefix in prefixes):
120+
result.add(container_name)
121+
122+
return result
123+
124+
125+
async def remove_container_forcefully(docker: Docker, container_id: str) -> None:
126+
"""Removes a container regardless of it's state"""
127+
try:
128+
container = await docker.containers.get(container_id)
129+
await container.delete(force=True)
130+
except DockerError as e:
131+
if e.status != status.HTTP_404_NOT_FOUND:
132+
raise
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# pylint:disable=redefined-outer-name
2+
# pylint:disable=unused-argument
3+
4+
from collections.abc import Awaitable, Callable
5+
from unittest.mock import AsyncMock
6+
7+
import pytest
8+
import pytest_mock
9+
from faker import Faker
10+
from fastapi import FastAPI
11+
from models_library.projects_nodes_io import NodeID
12+
from servicelib.rabbitmq import RabbitMQRPCClient
13+
from servicelib.rabbitmq.rpc_interfaces.agent import containers
14+
15+
pytest_simcore_core_services_selection = [
16+
"rabbit",
17+
]
18+
19+
20+
@pytest.fixture
21+
def node_id(faker: Faker) -> NodeID:
22+
return faker.uuid4(cast_to=None)
23+
24+
25+
@pytest.fixture
26+
async def rpc_client(
27+
initialized_app: FastAPI,
28+
rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]],
29+
) -> RabbitMQRPCClient:
30+
return await rabbitmq_rpc_client("client")
31+
32+
33+
@pytest.fixture
34+
def mocked_force_container_cleanup(mocker: pytest_mock.MockerFixture) -> AsyncMock:
35+
return mocker.patch(
36+
"simcore_service_agent.services.containers_manager.ContainersManager.force_container_cleanup"
37+
)
38+
39+
40+
async def test_force_container_cleanup(
41+
rpc_client: RabbitMQRPCClient,
42+
swarm_stack_name: str,
43+
docker_node_id: str,
44+
node_id: NodeID,
45+
mocked_force_container_cleanup: AsyncMock,
46+
):
47+
assert mocked_force_container_cleanup.call_count == 0
48+
await containers.force_container_cleanup(
49+
rpc_client,
50+
docker_node_id=docker_node_id,
51+
swarm_stack_name=swarm_stack_name,
52+
node_id=node_id,
53+
)
54+
assert mocked_force_container_cleanup.call_count == 1
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# pylint: disable=redefined-outer-name
2+
3+
4+
import logging
5+
from collections.abc import AsyncIterable, Awaitable, Callable
6+
from enum import Enum
7+
8+
import pytest
9+
from aiodocker import Docker, DockerError
10+
from asgi_lifespan import LifespanManager
11+
from faker import Faker
12+
from fastapi import FastAPI, status
13+
from models_library.api_schemas_directorv2.services import (
14+
DYNAMIC_PROXY_SERVICE_PREFIX,
15+
DYNAMIC_SIDECAR_SERVICE_PREFIX,
16+
)
17+
from models_library.projects_nodes_io import NodeID
18+
from simcore_service_agent.services.containers_manager import (
19+
get_containers_manager,
20+
setup_containers_manager,
21+
)
22+
23+
24+
@pytest.fixture
25+
async def app() -> AsyncIterable[FastAPI]:
26+
app = FastAPI()
27+
setup_containers_manager(app)
28+
29+
async with LifespanManager(app):
30+
yield app
31+
32+
33+
@pytest.fixture
34+
def node_id(faker: Faker) -> NodeID:
35+
return faker.uuid4(cast_to=None)
36+
37+
38+
@pytest.fixture
39+
async def docker() -> AsyncIterable[Docker]:
40+
async with Docker() as docker:
41+
yield docker
42+
43+
44+
class _ContainerMode(Enum):
45+
CREATED = "CREATED"
46+
RUNNING = "RUNNING"
47+
STOPPED = "STOPPED"
48+
49+
50+
@pytest.fixture
51+
async def create_container(
52+
docker: Docker,
53+
) -> AsyncIterable[Callable[[str, _ContainerMode], Awaitable[str]]]:
54+
created_containers: set[str] = set()
55+
56+
async def _(name: str, container_mode: _ContainerMode) -> str:
57+
container = await docker.containers.create(
58+
config={
59+
"Image": "alpine",
60+
"Cmd": ["sh", "-c", "while true; do sleep 1; done"],
61+
},
62+
name=name,
63+
)
64+
65+
if container_mode in (_ContainerMode.RUNNING, _ContainerMode.STOPPED):
66+
await container.start()
67+
if container_mode == _ContainerMode.STOPPED:
68+
await container.stop()
69+
70+
created_containers.add(container.id)
71+
return container.id
72+
73+
yield _
74+
75+
# cleanup containers
76+
for container_id in created_containers:
77+
try:
78+
container = await docker.containers.get(container_id)
79+
await container.delete(force=True)
80+
except DockerError as e:
81+
if e.status != status.HTTP_404_NOT_FOUND:
82+
raise
83+
84+
85+
async def test_force_container_cleanup(
86+
app: FastAPI,
87+
node_id: NodeID,
88+
create_container: Callable[[str, _ContainerMode], Awaitable[str]],
89+
faker: Faker,
90+
caplog: pytest.LogCaptureFixture,
91+
):
92+
caplog.set_level(logging.DEBUG)
93+
caplog.clear()
94+
95+
proxy_name = f"{DYNAMIC_PROXY_SERVICE_PREFIX}_{node_id}{faker.pystr()}"
96+
dynamic_sidecar_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}-{node_id}{faker.pystr()}"
97+
user_service_name = f"{DYNAMIC_SIDECAR_SERVICE_PREFIX}_{node_id}{faker.pystr()}"
98+
99+
await create_container(proxy_name, _ContainerMode.CREATED)
100+
await create_container(dynamic_sidecar_name, _ContainerMode.RUNNING)
101+
await create_container(user_service_name, _ContainerMode.STOPPED)
102+
103+
await get_containers_manager(app).force_container_cleanup(node_id)
104+
105+
assert proxy_name in caplog.text
106+
assert dynamic_sidecar_name in caplog.text
107+
assert user_service_name in caplog.text

0 commit comments

Comments
 (0)