Skip to content

Commit 41384a3

Browse files
authored
add cluster events consumer (#2318)
1 parent 3ab2fdd commit 41384a3

File tree

17 files changed

+316
-448
lines changed

17 files changed

+316
-448
lines changed

charts/platform-api-poller/templates/deployment.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,17 @@ spec:
6363
- name: NP_AUTH_PUBLIC_URL
6464
value: {{ .Values.platform.authUrl | quote }}
6565
- name: NP_AUTH_TOKEN
66-
{{- if .Values.platform.token }}
67-
{{ toYaml .Values.platform.token | indent 10 }}
68-
{{- end }}
66+
{{- if .Values.platform.token }}
67+
{{- toYaml .Values.platform.token | indent 10 }}
68+
{{- end }}
6969
- name: NP_PLATFORM_API_URL
7070
value: {{ .Values.platform.apiUrl | quote }}
7171
- name: NP_PLATFORM_CONFIG_URI
7272
value: {{ .Values.platform.configUrl | quote }}
7373
- name: NP_PLATFORM_ADMIN_URI
7474
value: {{ .Values.platform.adminUrl | quote }}
75+
- name: NP_EVENTS_URL
76+
value: {{ .Values.platform.eventsUrl | quote }}
7577
{{- if .Values.sentry }}
7678
- name: SENTRY_DSN
7779
value: {{ .Values.sentry.dsn }}

charts/platform-api-poller/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ platform:
2727
apiUrl:
2828
registryUrl:
2929
registryEmail:
30+
eventsUrl:
3031
token: {}
3132

3233
jobs:

charts/platform-api/templates/deployment.yml

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ spec:
6363
- name: NP_AUTH_URL
6464
value: {{ .Values.platform.authUrl | quote }}
6565
- name: NP_AUTH_TOKEN
66-
{{- if .Values.platform.token }}
67-
{{ toYaml .Values.platform.token | indent 10 }}
68-
{{- end }}
66+
{{- if .Values.platform.token }}
67+
{{- toYaml .Values.platform.token | indent 10 }}
68+
{{- end }}
6969
- name: NP_OAUTH_AUTH_URL
7070
value: {{ .Values.oauth.authUrl | quote }}
7171
- name: NP_OAUTH_TOKEN_URL
@@ -93,17 +93,19 @@ spec:
9393
- name: NP_NOTIFICATIONS_URL
9494
value: {{ .Values.platform.notificationsUrl | quote }}
9595
- name: NP_NOTIFICATIONS_TOKEN
96-
{{- if .Values.platform.token }}
97-
{{ toYaml .Values.platform.token | indent 10 }}
98-
{{- end }}
96+
{{- if .Values.platform.token }}
97+
{{- toYaml .Values.platform.token | indent 10 }}
98+
{{- end }}
99+
- name: NP_EVENTS_URL
100+
value: {{ .Values.platform.eventsUrl | quote }}
99101
- name: NP_ENFORCER_PLATFORM_API_URL
100102
value: http://localhost:8080/api/v1
101103
- name: NP_ENFORCER_RETENTION_DELAY_DAYS
102104
value: {{ .Values.enforcerRetentionDelayDays | quote }}
103105
- name: NP_DB_POSTGRES_DSN
104-
{{- if .Values.postgres.dsn }}
105-
{{ toYaml .Values.postgres.dsn | indent 10 }}
106-
{{- end }}
106+
{{- if .Values.postgres.dsn }}
107+
{{- toYaml .Values.postgres.dsn | indent 10 }}
108+
{{- end }}
107109
{{- if .Values.sentry }}
108110
- name: SENTRY_DSN
109111
value: {{ .Values.sentry.dsn }}

charts/platform-api/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ platform:
2828
authUrl: http://platform-auth:8080
2929
configUrl: http://platform-config:8080/api/v1
3030
adminUrl: http://platform-admin:8080/apis/admin/v1
31+
eventsUrl: http://platform-events:8080
3132
notificationsUrl: http://platform-notifications:8080
3233
token: {}
3334

platform_api/api.py

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import aiohttp.web
88
from aiohttp.web import HTTPUnauthorized
99
from aiohttp.web_urldispatcher import AbstractRoute
10-
from aiohttp_security import check_permission
10+
from apolo_events_client import from_config as create_events_client_from_config
1111
from neuro_admin_client import AdminClient, OrgUser, ProjectUser
12-
from neuro_auth_client import AuthClient, Permission
12+
from neuro_auth_client import AuthClient
1313
from neuro_auth_client.security import AuthScheme, setup_security
1414
from neuro_logging import init_logging, setup_sentry
1515
from neuro_notifications_client import Client as NotificationsClient
@@ -44,12 +44,7 @@
4444
from .orchestrator.jobs_storage.base import JobStorageTransactionError
4545
from .postgres import make_async_engine
4646
from .resource import Preset, ResourcePoolType
47-
from .user import authorized_user, untrusted_user
48-
from .utils.update_notifier import (
49-
Notifier,
50-
PostgresChannelNotifier,
51-
ResubscribingNotifier,
52-
)
47+
from .user import authorized_user
5348

5449
logger = logging.getLogger(__name__)
5550

@@ -68,33 +63,12 @@ def __init__(self, *, app: aiohttp.web.Application, config: Config):
6863
self._config = config
6964

7065
def register(self, app: aiohttp.web.Application) -> None:
71-
app.add_routes(
72-
(
73-
aiohttp.web.get("", self.handle_config),
74-
aiohttp.web.post("/clusters/sync", self.handle_clusters_sync),
75-
)
76-
)
66+
app.add_routes((aiohttp.web.get("", self.handle_config),))
7767

7868
@property
7969
def _jobs_service(self) -> JobsService:
8070
return self._app["jobs_service"]
8171

82-
@property
83-
def _cluster_update_notifier(self) -> Notifier:
84-
return self._app["cluster_update_notifier"]
85-
86-
async def handle_clusters_sync(
87-
self, request: aiohttp.web.Request
88-
) -> aiohttp.web.Response:
89-
user = await untrusted_user(request)
90-
permission = Permission(uri="cluster://", action="manage")
91-
logger.info("Checking whether %r has %r", user, permission)
92-
await check_permission(request, permission.action, [permission])
93-
94-
await self._cluster_update_notifier.notify()
95-
96-
return aiohttp.web.Response(text="OK")
97-
9872
async def handle_config(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
9973
"""Return platform configuration.
10074
@@ -449,11 +423,10 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
449423
logger.info("Initializing JobsStorage")
450424
jobs_storage: JobsStorage = PostgresJobsStorage(engine)
451425

452-
cluster_update_notifier = ResubscribingNotifier(
453-
PostgresChannelNotifier(engine, "cluster_update_required"),
454-
check_interval=15,
426+
logger.info("Initializing EventsClient")
427+
events_client = await exit_stack.enter_async_context(
428+
create_events_client_from_config(config.events)
455429
)
456-
app["config_app"]["cluster_update_notifier"] = cluster_update_notifier
457430

458431
logger.info("Initializing JobsService")
459432
jobs_service = JobsService(
@@ -468,8 +441,7 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
468441

469442
logger.info("Initializing ClusterUpdater")
470443
cluster_updater = ClusterUpdater(
471-
notifier=cluster_update_notifier,
472-
config=config,
444+
events_client=events_client,
473445
config_client=config_client,
474446
cluster_registry=cluster_config_registry,
475447
)

platform_api/cluster.py

Lines changed: 77 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
import asyncio
22
import logging
33
from abc import ABC, abstractmethod
4-
from collections.abc import AsyncIterator, Callable, Sequence
4+
from collections.abc import AsyncIterator, Callable
55
from contextlib import asynccontextmanager
6+
from typing import Self
67

78
from aiorwlock import RWLock
9+
from apolo_events_client import (
10+
AbstractEventsClient,
11+
EventType,
12+
FilterItem,
13+
RecvEvent,
14+
StreamType,
15+
)
816

917
from .cluster_config import ClusterConfig
10-
from .config import Config
1118
from .config_client import ConfigClient
1219
from .orchestrator.base import Orchestrator
13-
from .utils.update_notifier import Notifier
1420

1521
logger = logging.getLogger(__name__)
1622

@@ -59,84 +65,102 @@ def orchestrator(self) -> Orchestrator: # pragma: no cover
5965

6066

6167
class ClusterUpdater:
68+
_CONFIG_STREAM = StreamType("platform-config")
69+
_CLUSTER_ADD_EVENT = EventType("cluster-add")
70+
_CLUSTER_UPDATE_EVENT = EventType("cluster-update")
71+
_CLUSTER_REMOVE_EVENT = EventType("cluster-remove")
72+
6273
def __init__(
6374
self,
64-
notifier: Notifier,
75+
events_client: AbstractEventsClient,
6576
cluster_registry: "ClusterConfigRegistry",
66-
config: Config,
6777
config_client: ConfigClient,
6878
):
69-
self._loop = asyncio.get_event_loop()
70-
self._notifier = notifier
79+
self._events_client = events_client
7180
self._cluster_registry = cluster_registry
72-
self._config = config
7381
self._config_client = config_client
7482

7583
self._is_active: asyncio.Future[None] | None = None
7684
self._task: asyncio.Future[None] | None = None
7785

78-
async def start(self) -> None:
79-
logger.info("Starting Cluster Updater")
80-
await self._init_task()
81-
82-
async def __aenter__(self) -> "ClusterUpdater":
83-
await self.start()
86+
async def __aenter__(self) -> Self:
87+
logger.info("Subscribe for %r", self._CONFIG_STREAM)
88+
await self._events_client.subscribe_group(
89+
self._CONFIG_STREAM,
90+
self._on_event,
91+
filters=[
92+
FilterItem(
93+
event_types=frozenset(
94+
[
95+
self._CLUSTER_ADD_EVENT,
96+
self._CLUSTER_UPDATE_EVENT,
97+
self._CLUSTER_REMOVE_EVENT,
98+
]
99+
)
100+
)
101+
],
102+
)
103+
logger.info("Subscribed")
84104
return self
85105

86-
async def __aexit__(self, *args: object) -> None:
87-
await self.stop()
88-
89-
async def _init_task(self) -> None:
90-
assert not self._is_active
91-
assert not self._task
92-
93-
self._is_active = self._loop.create_future()
94-
self._task = asyncio.ensure_future(self._run())
95-
# forcing execution of the newly created task
96-
await asyncio.sleep(0)
97-
98-
async def stop(self) -> None:
99-
logger.info("Stopping Cluster Updater")
100-
assert self._is_active is not None
101-
self._is_active.set_result(None)
102-
103-
assert self._task
104-
await self._task
105-
106-
self._task = None
107-
self._is_active = None
108-
109-
async def _run(self) -> None:
110-
assert self._is_active is not None
111-
112-
def _listener() -> None:
113-
self._loop.create_task(self._do_update())
106+
async def __aexit__(self, exc_typ: object, exc_val: object, exc_tb: object) -> None:
107+
pass
114108

115-
async with self._notifier.listen_to_updates(_listener):
116-
await self._is_active
109+
async def _on_event(self, ev: RecvEvent) -> None:
110+
assert ev.cluster, "event cluster is required"
117111

118-
async def _do_update(self) -> None:
119-
cluster_configs = await self._config_client.get_clusters()
120-
cluster_registry = self._cluster_registry
121-
for cluster_config in cluster_configs:
122-
await cluster_registry.replace(cluster_config)
123-
await cluster_registry.cleanup(cluster_configs)
112+
if (
113+
ev.event_type == self._CLUSTER_UPDATE_EVENT
114+
or ev.event_type == self._CLUSTER_ADD_EVENT
115+
):
116+
cluster_config = await self._config_client.get_cluster(ev.cluster)
117+
if cluster_config:
118+
await self._cluster_registry.replace(cluster_config)
119+
else:
120+
logger.warning("Cluster %r not found", ev.cluster)
121+
if ev.event_type == self._CLUSTER_REMOVE_EVENT:
122+
self._cluster_registry.remove(ev.cluster)
124123

125124

126125
class SingleClusterUpdater:
126+
_CONFIG_STREAM = StreamType("platform-config")
127+
_CLUSTER_UPDATE_EVENT = EventType("cluster-update")
128+
127129
def __init__(
128130
self,
129-
cluster_holder: "ClusterHolder",
131+
events_client: AbstractEventsClient,
130132
config_client: ConfigClient,
133+
cluster_holder: "ClusterHolder",
131134
cluster_name: str,
132135
):
133-
self._loop = asyncio.get_event_loop()
134-
self._cluster_holder = cluster_holder
136+
self._events_client = events_client
135137
self._config_client = config_client
138+
self._cluster_holder = cluster_holder
136139
self._cluster_name = cluster_name
137140

138141
self.disable_updates_for_test = False
139142

143+
async def __aenter__(self) -> Self:
144+
logger.info("Subscribe for %r", self._CONFIG_STREAM)
145+
await self._events_client.subscribe_group(
146+
self._CONFIG_STREAM,
147+
self._on_event,
148+
filters=[
149+
FilterItem(
150+
event_types=frozenset([self._CLUSTER_UPDATE_EVENT]),
151+
clusters=frozenset([self._cluster_name]),
152+
)
153+
],
154+
)
155+
logger.info("Subscribed")
156+
return self
157+
158+
async def __aexit__(self, exc_typ: object, exc_val: object, exc_tb: object) -> None:
159+
pass
160+
161+
async def _on_event(self, _: RecvEvent) -> None:
162+
await self.do_update()
163+
140164
async def do_update(self) -> None:
141165
if self.disable_updates_for_test:
142166
return
@@ -233,14 +257,3 @@ def remove(self, name: str) -> ClusterConfig:
233257
if not record:
234258
raise ClusterNotFound.create(name)
235259
return record
236-
237-
async def cleanup(self, keep_clusters: Sequence[ClusterConfig]) -> None:
238-
all_cluster_names = set(self._records.keys())
239-
keep_clusters_with_names = {
240-
cluster_config.name for cluster_config in keep_clusters
241-
}
242-
for cluster_for_removal in all_cluster_names - keep_clusters_with_names:
243-
try:
244-
self.remove(cluster_for_removal)
245-
except ClusterNotFound:
246-
pass

platform_api/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import timedelta
44
from decimal import Decimal
55

6+
from apolo_events_client import EventsClientConfig
67
from yarl import URL
78

89
from alembic.config import Config as AlembicConfig
@@ -166,6 +167,8 @@ class Config:
166167

167168
scheduler: JobsSchedulerConfig = JobsSchedulerConfig()
168169

170+
events: EventsClientConfig | None = None
171+
169172

170173
@dataclass(frozen=True)
171174
class PollerConfig:
@@ -185,6 +188,8 @@ class PollerConfig:
185188

186189
scheduler: JobsSchedulerConfig = JobsSchedulerConfig()
187190

191+
events: EventsClientConfig | None = None
192+
188193

189194
@dataclass(frozen=True)
190195
class PlatformConfig:

0 commit comments

Comments
 (0)