Skip to content

Commit 18f8520

Browse files
authored
Merge branch 'master' into enh/scalate-compatibility
2 parents b839abd + 9057d01 commit 18f8520

File tree

23 files changed

+435
-39
lines changed

23 files changed

+435
-39
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
def get_metrics_namespace(application_name: str) -> str:
2+
return application_name.replace("-", "_")

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ async def wait_till_rabbitmq_responsive(url: str) -> bool:
5353

5454

5555
def get_rabbitmq_client_unique_name(base_name: str) -> str:
56-
# NOTE: below prefix is guaranteed to change each time the preocess restarts
57-
# Why is this desiarable?
58-
# 1. the code base makes the above assumption, otherwise subcscribers and consumers do not work
59-
# 2. enables restartability of webserver during [re]deploys
56+
# NOTE: The prefix below will change every time the process restarts.
57+
# Why is this necessary?
58+
# 1. The codebase relies on this behavior; without it, subscribers and consumers will fail.
59+
# 2. It allows the web server to be restarted seamlessly during [re]deployments.
6060
prefix_create_time = f"{psutil.Process(os.getpid()).create_time()}".strip(".")[-6:]
6161

6262
return f"{base_name}_{socket.gethostname()}_{prefix_create_time}"

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_constants.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from typing import Final
22

3+
from servicelib.instrumentation import get_metrics_namespace
4+
35
from ..._meta import APP_NAME
46

5-
METRICS_NAMESPACE: Final[str] = APP_NAME.replace("-", "_")
7+
METRICS_NAMESPACE: Final[str] = get_metrics_namespace(APP_NAME)
68
EC2_INSTANCE_LABELS: Final[tuple[str, ...]] = ("instance_type",)
79

810
CLUSTER_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = {

services/director-v2/src/simcore_service_director_v2/api/routes/dynamic_services.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ async def create_dynamic_service(
136136
logger.debug("Redirecting %s", redirect_url_with_query)
137137
return RedirectResponse(str(redirect_url_with_query))
138138

139-
#
140139
if not await is_sidecar_running(
141140
service.node_uuid, dynamic_services_settings.DYNAMIC_SCHEDULER.SWARM_STACK_NAME
142141
):

services/director-v2/src/simcore_service_director_v2/core/application.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
override_fastapi_openapi_method,
88
)
99
from servicelib.fastapi.profiler_middleware import ProfilerMiddleware
10-
from servicelib.fastapi.prometheus_instrumentation import (
11-
setup_prometheus_instrumentation,
12-
)
1310
from servicelib.fastapi.tracing import setup_tracing
1411
from servicelib.logging_utils import config_all_loggers
1512

@@ -28,6 +25,7 @@
2825
director_v0,
2926
dynamic_services,
3027
dynamic_sidecar,
28+
instrumentation,
3129
notifier,
3230
rabbitmq,
3331
redis,
@@ -192,7 +190,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
192190
resource_usage_tracker_client.setup(app)
193191

194192
if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED:
195-
setup_prometheus_instrumentation(app)
193+
instrumentation.setup(app)
196194
if settings.DIRECTOR_V2_TRACING:
197195
setup_tracing(app, app.state.settings.DIRECTOR_V2_TRACING, APP_NAME)
198196

services/director-v2/src/simcore_service_director_v2/models/dynamic_services_scheduler.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import logging
33
import re
44
from collections.abc import Mapping
5+
from datetime import datetime
56
from enum import Enum
67
from functools import cached_property
78
from pathlib import Path
89
from typing import Any, TypeAlias
910
from uuid import UUID
1011

12+
import arrow
1113
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate
1214
from models_library.api_schemas_directorv2.dynamic_services_service import (
1315
CommonServiceDetails,
@@ -170,6 +172,28 @@ def mark_removed(self) -> None:
170172
self.was_removed = True
171173

172174

175+
class ServicesInstrumentation(BaseModel):
176+
start_requested_at: datetime | None = Field(
177+
None,
178+
description="moment in which the process of starting the service was requested",
179+
)
180+
close_requested_at: datetime | None = Field(
181+
None,
182+
description="moment in which the process of stopping the service was requested",
183+
)
184+
185+
def elapsed_since_start_request(self) -> float | None:
186+
if self.start_requested_at is None:
187+
return None
188+
189+
return (arrow.utcnow().datetime - self.start_requested_at).total_seconds()
190+
191+
def elapsed_since_close_request(self) -> float | None:
192+
if self.close_requested_at is None:
193+
return None
194+
return (arrow.utcnow().datetime - self.close_requested_at).total_seconds()
195+
196+
173197
class DynamicSidecar(BaseModel):
174198
status: Status = Field(
175199
Status.create_as_initially_ok(),
@@ -254,6 +278,11 @@ def compose_spec_submitted(self) -> bool:
254278
description="set True if the dy-sidecar saves the state and uploads the outputs",
255279
)
256280

281+
instrumentation: ServicesInstrumentation = Field(
282+
default_factory=lambda: ServicesInstrumentation.parse_obj({}),
283+
description="keeps track times for various operations",
284+
)
285+
257286
# below had already been validated and
258287
# used only to start the proxy
259288
dynamic_sidecar_id: ServiceId | None = Field(

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,18 +349,20 @@ async def stop_service(
349349
progress_callback,
350350
)
351351

352-
async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None:
352+
async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> int:
353353
response = await self._thin_client.post_containers_tasks_state_restore(
354354
dynamic_sidecar_endpoint
355355
)
356356
task_id: TaskId = response.json()
357357

358-
await self._await_for_result(
358+
result: Any | None = await self._await_for_result(
359359
task_id,
360360
dynamic_sidecar_endpoint,
361361
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
362362
_debug_progress_callback,
363363
)
364+
assert isinstance(result, int) # nosec
365+
return result
364366

365367
async def pull_user_services_images(
366368
self, dynamic_sidecar_endpoint: AnyHttpUrl
@@ -381,18 +383,20 @@ async def save_service_state(
381383
self,
382384
dynamic_sidecar_endpoint: AnyHttpUrl,
383385
progress_callback: ProgressCallback | None = None,
384-
) -> None:
386+
) -> int:
385387
response = await self._thin_client.post_containers_tasks_state_save(
386388
dynamic_sidecar_endpoint
387389
)
388390
task_id: TaskId = response.json()
389391

390-
await self._await_for_result(
392+
result: Any | None = await self._await_for_result(
391393
task_id,
392394
dynamic_sidecar_endpoint,
393395
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
394396
progress_callback,
395397
)
398+
assert isinstance(result, int) # nosec
399+
return result
396400

397401
async def pull_service_input_ports(
398402
self,
@@ -416,18 +420,20 @@ async def pull_service_output_ports(
416420
self,
417421
dynamic_sidecar_endpoint: AnyHttpUrl,
418422
port_keys: list[str] | None = None,
419-
) -> None:
423+
) -> int:
420424
response = await self._thin_client.post_containers_tasks_ports_outputs_pull(
421425
dynamic_sidecar_endpoint, port_keys
422426
)
423427
task_id: TaskId = response.json()
424428

425-
await self._await_for_result(
429+
result: Any | None = await self._await_for_result(
426430
task_id,
427431
dynamic_sidecar_endpoint,
428432
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
429433
_debug_progress_callback,
430434
)
435+
assert isinstance(result, int) # nosec
436+
return result
431437

432438
async def push_service_output_ports(
433439
self,

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_user_services.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
DynamicServicesSchedulerSettings,
2020
)
2121
from .....models.dynamic_services_scheduler import SchedulerData
22+
from .....modules.instrumentation import get_instrumentation, get_metrics_labels
2223
from .....utils.db import get_repository
2324
from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository
2425
from ....db.repositories.projects import ProjectsRepository
@@ -222,4 +223,12 @@ async def progress_create_containers(
222223

223224
scheduler_data.dynamic_sidecar.were_containers_created = True
224225

226+
start_duration = (
227+
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_start_request()
228+
)
229+
assert start_duration is not None # nosec
230+
get_instrumentation(app).dynamic_sidecar_metrics.start_time_duration.labels(
231+
**get_metrics_labels(scheduler_data)
232+
).observe(start_duration)
233+
225234
_logger.info("Internal state after creating user services %s", scheduler_data)

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@
4444
DockerStatus,
4545
SchedulerData,
4646
)
47+
from .....modules.instrumentation import (
48+
get_instrumentation,
49+
get_metrics_labels,
50+
get_rate,
51+
track_duration,
52+
)
4753
from .....utils.db import get_repository
4854
from ....db.repositories.projects import ProjectsRepository
4955
from ....db.repositories.projects_networks import ProjectsNetworksRepository
@@ -157,9 +163,15 @@ async def service_save_state(
157163
progress_callback: ProgressCallback | None = None,
158164
) -> None:
159165
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)
160-
await sidecars_client.save_service_state(
161-
scheduler_data.endpoint, progress_callback=progress_callback
162-
)
166+
167+
with track_duration() as duration:
168+
size = await sidecars_client.save_service_state(
169+
scheduler_data.endpoint, progress_callback=progress_callback
170+
)
171+
get_instrumentation(app).dynamic_sidecar_metrics.push_service_state_rate.labels(
172+
**get_metrics_labels(scheduler_data)
173+
).observe(get_rate(size, duration.to_flaot()))
174+
163175
await sidecars_client.update_volume_state(
164176
scheduler_data.endpoint,
165177
volume_category=VolumeCategory.STATES,
@@ -375,6 +387,16 @@ async def attempt_pod_removal_and_data_saving(
375387
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
376388
await rabbitmq_client.publish(message.channel_name, message)
377389

390+
# metrics
391+
392+
stop_duration = (
393+
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_close_request()
394+
)
395+
assert stop_duration is not None # nosec
396+
get_instrumentation(app).dynamic_sidecar_metrics.stop_time_duration.labels(
397+
**get_metrics_labels(scheduler_data)
398+
).observe(stop_duration)
399+
378400

379401
async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None:
380402
_logger.debug("Attaching project networks for %s", scheduler_data.service_name)
@@ -460,14 +482,44 @@ async def prepare_services_environment(
460482
)
461483
)
462484

485+
async def _pull_output_ports_with_metrics() -> None:
486+
with track_duration() as duration:
487+
size: int = await sidecars_client.pull_service_output_ports(
488+
dynamic_sidecar_endpoint
489+
)
490+
491+
get_instrumentation(app).dynamic_sidecar_metrics.output_ports_pull_rate.labels(
492+
**get_metrics_labels(scheduler_data)
493+
).observe(get_rate(size, duration.to_flaot()))
494+
495+
async def _pull_user_services_images_with_metrics() -> None:
496+
with track_duration() as duration:
497+
await sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint)
498+
499+
get_instrumentation(
500+
app
501+
).dynamic_sidecar_metrics.pull_user_services_images_duration.labels(
502+
**get_metrics_labels(scheduler_data)
503+
).observe(
504+
duration.to_flaot()
505+
)
506+
507+
async def _restore_service_state_with_metrics() -> None:
508+
with track_duration() as duration:
509+
size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint)
510+
511+
get_instrumentation(app).dynamic_sidecar_metrics.pull_service_state_rate.labels(
512+
**get_metrics_labels(scheduler_data)
513+
).observe(get_rate(size, duration.to_flaot()))
514+
463515
tasks = [
464-
sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint),
465-
sidecars_client.pull_service_output_ports(dynamic_sidecar_endpoint),
516+
_pull_user_services_images_with_metrics(),
517+
_pull_output_ports_with_metrics(),
466518
]
467519
# When enabled no longer downloads state via nodeports
468520
# S3 is used to store state paths
469521
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
470-
tasks.append(sidecars_client.restore_service_state(dynamic_sidecar_endpoint))
522+
tasks.append(_restore_service_state_with_metrics())
471523

472524
await limited_gather(*tasks, limit=3)
473525

services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
import contextlib
1818
import functools
1919
import logging
20+
import time
2021
from asyncio import Lock, Queue, Task
2122
from dataclasses import dataclass, field
2223
from typing import Final
2324

25+
import arrow
2426
from fastapi import FastAPI
2527
from models_library.api_schemas_directorv2.dynamic_services import (
2628
DynamicServiceCreate,
@@ -54,6 +56,11 @@
5456
DynamicServicesSchedulerSettings,
5557
)
5658
from .....models.dynamic_services_scheduler import SchedulerData, ServiceName
59+
from .....modules.instrumentation import (
60+
get_instrumentation,
61+
get_metrics_labels,
62+
get_rate,
63+
)
5764
from ...api_client import SidecarsClient, get_sidecars_client
5865
from ...docker_api import update_scheduler_data_label
5966
from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError
@@ -255,6 +262,9 @@ async def add_service(
255262
request_simcore_user_agent=request_simcore_user_agent,
256263
can_save=can_save,
257264
)
265+
scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = (
266+
arrow.utcnow().datetime
267+
)
258268
await self.add_service_from_scheduler_data(scheduler_data)
259269

260270
async def add_service_from_scheduler_data(
@@ -353,6 +363,10 @@ async def mark_service_for_removal(
353363
)
354364
return
355365

366+
current.dynamic_sidecar.instrumentation.close_requested_at = (
367+
arrow.utcnow().datetime
368+
)
369+
356370
# PC-> ANE: could you please review what to do when can_save=None
357371
assert can_save is not None # nosec
358372
current.dynamic_sidecar.service_removal_state.mark_to_remove(
@@ -455,9 +469,19 @@ async def retrieve_service_inputs(
455469
dynamic_sidecar_endpoint: AnyHttpUrl = scheduler_data.endpoint
456470
sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_uuid)
457471

472+
started = time.time()
458473
transferred_bytes = await sidecars_client.pull_service_input_ports(
459474
dynamic_sidecar_endpoint, port_keys
460475
)
476+
duration = time.time() - started
477+
478+
get_instrumentation(
479+
self.app
480+
).dynamic_sidecar_metrics.input_ports_pull_rate.labels(
481+
**get_metrics_labels(scheduler_data)
482+
).observe(
483+
get_rate(transferred_bytes, duration)
484+
)
461485

462486
if scheduler_data.restart_policy == RestartPolicy.ON_INPUTS_DOWNLOADED:
463487
logger.info("Will restart containers")

0 commit comments

Comments
 (0)