Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8bb01e9
added instrumentation basis
Sep 11, 2024
3abec0b
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
3baaa63
adding metrics for starting and stopping
Sep 11, 2024
e4e41e3
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
c05eab2
pull user service images and output ports with metrics
Sep 11, 2024
ab1c162
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
a064f84
added metrics for pulling inputs
Sep 11, 2024
cb7584d
added metrics for restoring service state
Sep 11, 2024
1f58de7
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
00ff066
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
9907398
using if in application
Sep 13, 2024
704c191
renaming
Sep 13, 2024
361b028
remove casting
Sep 13, 2024
9d37764
refactor code safety
Sep 13, 2024
50d985a
refactor to store rate
Sep 13, 2024
87a9bbd
using rate buckets
Sep 13, 2024
71b6f07
refactor bucket names and time buckets
Sep 13, 2024
1303c91
rename subsystem
Sep 13, 2024
b0d6e51
total duration is required not the rate
Sep 13, 2024
dee080c
using same function to create namespaces
Sep 13, 2024
e8f4ea3
adding metric for savign service states
Sep 13, 2024
7abd932
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
819d2c6
renaming
Sep 13, 2024
de5e046
refactor
Sep 13, 2024
a6f131a
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
26b258a
fixed broken test
Sep 13, 2024
9a7165f
fixed failing tests
Sep 13, 2024
6d25e44
enabling instrumentation
Sep 13, 2024
fc9fde8
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
74e261e
using epsilon to avoid issues
Sep 13, 2024
70362cf
refactred test
Sep 13, 2024
7db3cc8
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 17, 2024
785a7ff
fixed broken import
Sep 17, 2024
7571f7d
fixed broken test
Sep 17, 2024
7854bf7
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 17, 2024
395eacb
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 18, 2024
6975378
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 18, 2024
0d2e9ab
update docs
Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/service-library/src/servicelib/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def get_metrics_namespace(application_name: str) -> str:
return application_name.replace("-", "_")
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Final

from servicelib.instrumentation import get_metrics_namespace

from ..._meta import APP_NAME

METRICS_NAMESPACE: Final[str] = APP_NAME.replace("-", "_")
METRICS_NAMESPACE: Final[str] = get_metrics_namespace(APP_NAME)
EC2_INSTANCE_LABELS: Final[tuple[str, ...]] = ("instance_type",)

CLUSTER_METRICS_DEFINITIONS: Final[dict[str, tuple[str, tuple[str, ...]]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ async def create_dynamic_service(
logger.debug("Redirecting %s", redirect_url_with_query)
return RedirectResponse(str(redirect_url_with_query))

#
if not await is_sidecar_running(
service.node_uuid, dynamic_services_settings.DYNAMIC_SCHEDULER.SWARM_STACK_NAME
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
override_fastapi_openapi_method,
)
from servicelib.fastapi.profiler_middleware import ProfilerMiddleware
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
from servicelib.logging_utils import config_all_loggers

from ..api.entrypoints import api_router
Expand All @@ -27,6 +24,7 @@
director_v0,
dynamic_services,
dynamic_sidecar,
instrumentation,
notifier,
rabbitmq,
redis,
Expand Down Expand Up @@ -191,7 +189,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
resource_usage_tracker_client.setup(app)

if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED:
setup_prometheus_instrumentation(app)
instrumentation.setup(app)

if settings.DIRECTOR_V2_PROFILING:
app.add_middleware(ProfilerMiddleware)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import logging
import re
from collections.abc import Mapping
from datetime import datetime
from enum import Enum
from functools import cached_property
from pathlib import Path
from typing import Any, TypeAlias
from uuid import UUID

import arrow
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate
from models_library.api_schemas_directorv2.dynamic_services_service import (
CommonServiceDetails,
Expand Down Expand Up @@ -170,6 +172,28 @@ def mark_removed(self) -> None:
self.was_removed = True


class ServicesInstrumentation(BaseModel):
start_requested_at: datetime | None = Field(
None,
description="moment in which the process of starting the service was requested",
)
close_requested_at: datetime | None = Field(
None,
description="moment in which the process of stopping the service was requested",
)

def elapsed_since_start_request(self) -> float | None:
if self.start_requested_at is None:
return None

return (arrow.utcnow().datetime - self.start_requested_at).total_seconds()

def elapsed_since_close_request(self) -> float | None:
if self.close_requested_at is None:
return None
return (arrow.utcnow().datetime - self.close_requested_at).total_seconds()


class DynamicSidecar(BaseModel):
status: Status = Field(
Status.create_as_initially_ok(),
Expand Down Expand Up @@ -254,6 +278,11 @@ def compose_spec_submitted(self) -> bool:
description="set True if the dy-sidecar saves the state and uploads the outputs",
)

instrumentation: ServicesInstrumentation = Field(
default_factory=lambda: ServicesInstrumentation.parse_obj({}),
description="keeps track times for various operations",
)

# below had already been validated and
# used only to start the proxy
dynamic_sidecar_id: ServiceId | None = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,20 @@ async def stop_service(
progress_callback,
)

async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None:
async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> int:
response = await self._thin_client.post_containers_tasks_state_restore(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def pull_user_services_images(
self, dynamic_sidecar_endpoint: AnyHttpUrl
Expand All @@ -381,18 +383,20 @@ async def save_service_state(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
progress_callback: ProgressCallback | None = None,
) -> None:
) -> int:
response = await self._thin_client.post_containers_tasks_state_save(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def pull_service_input_ports(
self,
Expand All @@ -416,18 +420,20 @@ async def pull_service_output_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
port_keys: list[str] | None = None,
) -> None:
) -> int:
response = await self._thin_client.post_containers_tasks_ports_outputs_pull(
dynamic_sidecar_endpoint, port_keys
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
assert isinstance(result, int) # nosec
return result

async def push_service_output_ports(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData
from .....modules.instrumentation import get_instrumentation, get_metrics_labels
from .....utils.db import get_repository
from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository
from ....db.repositories.projects import ProjectsRepository
Expand Down Expand Up @@ -222,4 +223,12 @@ async def progress_create_containers(

scheduler_data.dynamic_sidecar.were_containers_created = True

start_duration = (
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_start_request()
)
assert start_duration is not None # nosec
get_instrumentation(app).dynamic_sidecar_metrics.start_time_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(start_duration)

_logger.info("Internal state after creating user services %s", scheduler_data)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
DockerStatus,
SchedulerData,
)
from .....modules.instrumentation import (
get_instrumentation,
get_metrics_labels,
get_rate,
track_duration,
)
from .....utils.db import get_repository
from ....db.repositories.projects import ProjectsRepository
from ....db.repositories.projects_networks import ProjectsNetworksRepository
Expand Down Expand Up @@ -157,9 +163,15 @@ async def service_save_state(
progress_callback: ProgressCallback | None = None,
) -> None:
scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid)
await sidecars_client.save_service_state(
scheduler_data.endpoint, progress_callback=progress_callback
)

with track_duration() as duration:
size = await sidecars_client.save_service_state(
scheduler_data.endpoint, progress_callback=progress_callback
)
get_instrumentation(app).dynamic_sidecar_metrics.push_service_state_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

await sidecars_client.update_volume_state(
scheduler_data.endpoint,
volume_category=VolumeCategory.STATES,
Expand Down Expand Up @@ -375,6 +387,16 @@ async def attempt_pod_removal_and_data_saving(
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
await rabbitmq_client.publish(message.channel_name, message)

# metrics

stop_duration = (
scheduler_data.dynamic_sidecar.instrumentation.elapsed_since_close_request()
)
assert stop_duration is not None # nosec
get_instrumentation(app).dynamic_sidecar_metrics.stop_time_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(stop_duration)


async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None:
_logger.debug("Attaching project networks for %s", scheduler_data.service_name)
Expand Down Expand Up @@ -460,14 +482,44 @@ async def prepare_services_environment(
)
)

async def _pull_output_ports_with_metrics() -> None:
with track_duration() as duration:
size: int = await sidecars_client.pull_service_output_ports(
dynamic_sidecar_endpoint
)

get_instrumentation(app).dynamic_sidecar_metrics.output_ports_pull_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

async def _pull_user_services_images_with_metrics() -> None:
with track_duration() as duration:
await sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint)

get_instrumentation(
app
).dynamic_sidecar_metrics.pull_user_services_images_duration.labels(
**get_metrics_labels(scheduler_data)
).observe(
duration.to_flaot()
)

async def _restore_service_state_with_metrics() -> None:
with track_duration() as duration:
size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint)

get_instrumentation(app).dynamic_sidecar_metrics.pull_service_state_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(get_rate(size, duration.to_flaot()))

tasks = [
sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint),
sidecars_client.pull_service_output_ports(dynamic_sidecar_endpoint),
_pull_user_services_images_with_metrics(),
_pull_output_ports_with_metrics(),
]
# When enabled no longer downloads state via nodeports
# S3 is used to store state paths
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
tasks.append(sidecars_client.restore_service_state(dynamic_sidecar_endpoint))
tasks.append(_restore_service_state_with_metrics())

await limited_gather(*tasks, limit=3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import contextlib
import functools
import logging
import time
from asyncio import Lock, Queue, Task
from dataclasses import dataclass, field
from typing import Final

import arrow
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceCreate,
Expand Down Expand Up @@ -54,6 +56,11 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData, ServiceName
from .....modules.instrumentation import (
get_instrumentation,
get_metrics_labels,
get_rate,
)
from ...api_client import SidecarsClient, get_sidecars_client
from ...docker_api import update_scheduler_data_label
from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError
Expand Down Expand Up @@ -255,6 +262,9 @@ async def add_service(
request_simcore_user_agent=request_simcore_user_agent,
can_save=can_save,
)
scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = (
arrow.utcnow().datetime
)
await self.add_service_from_scheduler_data(scheduler_data)

async def add_service_from_scheduler_data(
Expand Down Expand Up @@ -353,6 +363,10 @@ async def mark_service_for_removal(
)
return

current.dynamic_sidecar.instrumentation.close_requested_at = (
arrow.utcnow().datetime
)

# PC-> ANE: could you please review what to do when can_save=None
assert can_save is not None # nosec
current.dynamic_sidecar.service_removal_state.mark_to_remove(
Expand Down Expand Up @@ -455,9 +469,19 @@ async def retrieve_service_inputs(
dynamic_sidecar_endpoint: AnyHttpUrl = scheduler_data.endpoint
sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_uuid)

started = time.time()
transferred_bytes = await sidecars_client.pull_service_input_ports(
dynamic_sidecar_endpoint, port_keys
)
duration = time.time() - started

get_instrumentation(
self.app
).dynamic_sidecar_metrics.input_ports_pull_rate.labels(
**get_metrics_labels(scheduler_data)
).observe(
get_rate(transferred_bytes, duration)
)

if scheduler_data.restart_policy == RestartPolicy.ON_INPUTS_DOWNLOADED:
logger.info("Will restart containers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ._setup import get_instrumentation, setup
from ._utils import get_metrics_labels, get_rate, track_duration

__all__: tuple[str, ...] = (
"get_instrumentation",
"get_metrics_labels",
"get_rate",
"setup",
"track_duration",
)
Loading