Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8bb01e9
added instrumentation basis
Sep 11, 2024
3abec0b
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
3baaa63
adding metrics for starting and stopping
Sep 11, 2024
e4e41e3
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
c05eab2
pull user service images and output ports with metrics
Sep 11, 2024
ab1c162
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
a064f84
added metrics for pulling inputs
Sep 11, 2024
cb7584d
added metrics for restoring service state
Sep 11, 2024
1f58de7
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 11, 2024
00ff066
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
9907398
using if in application
Sep 13, 2024
704c191
renaming
Sep 13, 2024
361b028
remove casting
Sep 13, 2024
9d37764
refactor code safety
Sep 13, 2024
50d985a
refactor to store rate
Sep 13, 2024
87a9bbd
using rate buckets
Sep 13, 2024
71b6f07
refactor bucket names and time buckets
Sep 13, 2024
1303c91
rename subsystem
Sep 13, 2024
b0d6e51
total duration is required not the rate
Sep 13, 2024
dee080c
using same function to create namespaces
Sep 13, 2024
e8f4ea3
adding metric for savign service states
Sep 13, 2024
7abd932
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
819d2c6
renaming
Sep 13, 2024
de5e046
refactor
Sep 13, 2024
a6f131a
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
26b258a
fixed broken test
Sep 13, 2024
9a7165f
fixed failing tests
Sep 13, 2024
6d25e44
enabling instrumentation
Sep 13, 2024
fc9fde8
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 13, 2024
74e261e
using epsilon to avoid issues
Sep 13, 2024
70362cf
refactred test
Sep 13, 2024
7db3cc8
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 17, 2024
785a7ff
fixed broken import
Sep 17, 2024
7571f7d
fixed broken test
Sep 17, 2024
7854bf7
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 17, 2024
395eacb
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 18, 2024
6975378
Merge remote-tracking branch 'upstream/master' into pr-osparc-add-met…
Sep 18, 2024
0d2e9ab
update docs
Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async def pull_images(
registry_settings: RegistrySettings,
progress_cb: AsyncReportCB,
log_cb: LogCB,
) -> None:
) -> int:
images_layer_information = await asyncio.gather(
*[
retrieve_image_layer_information(image, registry_settings)
Expand Down Expand Up @@ -147,3 +147,4 @@ async def pull_images(
)
]
)
return images_total_size
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ async def create_dynamic_service(
logger.debug("Redirecting %s", redirect_url_with_query)
return RedirectResponse(str(redirect_url_with_query))

#
if not await is_sidecar_running(
service.node_uuid, dynamic_services_settings.DYNAMIC_SCHEDULER.SWARM_STACK_NAME
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
override_fastapi_openapi_method,
)
from servicelib.fastapi.profiler_middleware import ProfilerMiddleware
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
from servicelib.logging_utils import config_all_loggers

from ..api.entrypoints import api_router
Expand All @@ -27,6 +24,7 @@
director_v0,
dynamic_services,
dynamic_sidecar,
instrumentation,
notifier,
rabbitmq,
redis,
Expand Down Expand Up @@ -190,8 +188,7 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:

resource_usage_tracker_client.setup(app)

if settings.DIRECTOR_V2_PROMETHEUS_INSTRUMENTATION_ENABLED:
setup_prometheus_instrumentation(app)
instrumentation.setup(app)

if settings.DIRECTOR_V2_PROFILING:
app.add_middleware(ProfilerMiddleware)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import logging
import re
from collections.abc import Mapping
from datetime import datetime
from enum import Enum
from functools import cached_property
from pathlib import Path
from typing import Any, TypeAlias
from uuid import UUID

import arrow
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate
from models_library.api_schemas_directorv2.dynamic_services_service import (
CommonServiceDetails,
Expand Down Expand Up @@ -170,6 +172,32 @@ def mark_removed(self) -> None:
self.was_removed = True


class MetricsTimers(BaseModel):
start_duration_of_service_starting: datetime | None = Field(
None,
description="moment in which the process of starting the service was requested",
)
start_duration_of_service_closing: datetime | None = Field(
None,
description="moment in which the process of stopping the service was requested",
)

def get_start_duration(self) -> float | None:
if self.start_duration_of_service_starting is None:
return None

return (
arrow.utcnow().datetime - self.start_duration_of_service_starting
).total_seconds()

def get_stop_duration(self) -> float | None:
if self.start_duration_of_service_closing is None:
return None
return (
arrow.utcnow().datetime - self.start_duration_of_service_closing
).total_seconds()


class DynamicSidecar(BaseModel):
status: Status = Field(
Status.create_as_initially_ok(),
Expand Down Expand Up @@ -254,6 +282,11 @@ def compose_spec_submitted(self) -> bool:
description="set True if the dy-sidecar saves the state and uploads the outputs",
)

metrics_timers: MetricsTimers = Field(
default_factory=lambda: MetricsTimers.parse_obj({}),
description="keeps track of start times for various operations",
)

# below had already been validated and
# used only to start the proxy
dynamic_sidecar_id: ServiceId | None = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import deque
from collections.abc import Coroutine
from functools import cached_property
from typing import Any, Final
from typing import Any, Final, cast

from fastapi import FastAPI, status
from httpx import AsyncClient
Expand Down Expand Up @@ -349,33 +349,35 @@ async def stop_service(
progress_callback,
)

async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None:
async def restore_service_state(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> int:
response = await self._thin_client.post_containers_tasks_state_restore(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
return cast(int, result)

async def pull_user_services_images(
self, dynamic_sidecar_endpoint: AnyHttpUrl
) -> None:
) -> int:
response = await self._thin_client.post_containers_images_pull(
dynamic_sidecar_endpoint
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_USER_SERVICES_PULLING_TIMEOUT,
_debug_progress_callback,
)
return cast(int, result)

async def save_service_state(
self,
Expand Down Expand Up @@ -416,18 +418,19 @@ async def pull_service_output_ports(
self,
dynamic_sidecar_endpoint: AnyHttpUrl,
port_keys: list[str] | None = None,
) -> None:
) -> int:
response = await self._thin_client.post_containers_tasks_ports_outputs_pull(
dynamic_sidecar_endpoint, port_keys
)
task_id: TaskId = response.json()

await self._await_for_result(
result: Any | None = await self._await_for_result(
task_id,
dynamic_sidecar_endpoint,
self._dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT,
_debug_progress_callback,
)
return cast(int, result)

async def push_service_output_ports(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData
from .....modules.instrumentation._setup import get_instrumentation
from .....modules.instrumentation._utils import get_start_stop_labels
from .....utils.db import get_repository
from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository
from ....db.repositories.projects import ProjectsRepository
Expand Down Expand Up @@ -222,4 +224,10 @@ async def progress_create_containers(

scheduler_data.dynamic_sidecar.were_containers_created = True

start_duration = scheduler_data.dynamic_sidecar.metrics_timers.get_start_duration()
if start_duration:
get_instrumentation(app).dynamic_sidecar_metrics.start_time_seconds.labels(
**get_start_stop_labels(scheduler_data)
).observe(start_duration)

_logger.info("Internal state after creating user services %s", scheduler_data)
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
DockerStatus,
SchedulerData,
)
from .....modules.instrumentation import (
get_instrumentation,
get_label_from_size,
get_start_stop_labels,
track_duration,
)
from .....utils.db import get_repository
from ....db.repositories.projects import ProjectsRepository
from ....db.repositories.projects_networks import ProjectsNetworksRepository
Expand Down Expand Up @@ -375,6 +381,14 @@ async def attempt_pod_removal_and_data_saving(
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
await rabbitmq_client.publish(message.channel_name, message)

# metrics

stop_duration = scheduler_data.dynamic_sidecar.metrics_timers.get_stop_duration()
if stop_duration:
get_instrumentation(app).dynamic_sidecar_metrics.stop_time_seconds.labels(
**get_start_stop_labels(scheduler_data)
).observe(stop_duration)


async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) -> None:
_logger.debug("Attaching project networks for %s", scheduler_data.service_name)
Expand Down Expand Up @@ -460,14 +474,54 @@ async def prepare_services_environment(
)
)

async def _pull_output_ports_with_metrics() -> None:
with track_duration() as duration:
size: int = await sidecars_client.pull_service_output_ports(
dynamic_sidecar_endpoint
)

get_instrumentation(
app
).dynamic_sidecar_metrics.output_ports_pull_seconds.labels(
**get_label_from_size(size)
).observe(
duration.to_flaot()
)

async def _pull_user_services_images_with_metrics() -> None:
with track_duration() as duration:
size = await sidecars_client.pull_user_services_images(
dynamic_sidecar_endpoint
)

get_instrumentation(
app
).dynamic_sidecar_metrics.pull_user_services_images_seconds.labels(
**get_label_from_size(size)
).observe(
duration.to_flaot()
)

async def _restore_service_state_with_metrics() -> None:
with track_duration() as duration:
size = await sidecars_client.restore_service_state(dynamic_sidecar_endpoint)

get_instrumentation(
app
).dynamic_sidecar_metrics.restore_service_state_seconds.labels(
**get_label_from_size(size)
).observe(
duration.to_flaot()
)

tasks = [
sidecars_client.pull_user_services_images(dynamic_sidecar_endpoint),
sidecars_client.pull_service_output_ports(dynamic_sidecar_endpoint),
_pull_user_services_images_with_metrics(),
_pull_output_ports_with_metrics(),
]
# When enabled no longer downloads state via nodeports
# S3 is used to store state paths
if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED:
tasks.append(sidecars_client.restore_service_state(dynamic_sidecar_endpoint))
tasks.append(_restore_service_state_with_metrics())

await limited_gather(*tasks, limit=3)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import contextlib
import functools
import logging
import time
from asyncio import Lock, Queue, Task
from dataclasses import dataclass, field
from typing import Final

import arrow
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceCreate,
Expand Down Expand Up @@ -54,6 +56,7 @@
DynamicServicesSchedulerSettings,
)
from .....models.dynamic_services_scheduler import SchedulerData, ServiceName
from .....modules.instrumentation import get_instrumentation, get_label_from_size
from ...api_client import SidecarsClient, get_sidecars_client
from ...docker_api import update_scheduler_data_label
from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError
Expand Down Expand Up @@ -255,6 +258,9 @@ async def add_service(
request_simcore_user_agent=request_simcore_user_agent,
can_save=can_save,
)
scheduler_data.dynamic_sidecar.metrics_timers.start_duration_of_service_starting = (
arrow.utcnow().datetime
)
await self.add_service_from_scheduler_data(scheduler_data)

async def add_service_from_scheduler_data(
Expand Down Expand Up @@ -353,6 +359,10 @@ async def mark_service_for_removal(
)
return

current.dynamic_sidecar.metrics_timers.start_duration_of_service_closing = (
arrow.utcnow().datetime
)

# PC-> ANE: could you please review what to do when can_save=None
assert can_save is not None # nosec
current.dynamic_sidecar.service_removal_state.mark_to_remove(
Expand Down Expand Up @@ -455,9 +465,19 @@ async def retrieve_service_inputs(
dynamic_sidecar_endpoint: AnyHttpUrl = scheduler_data.endpoint
sidecars_client: SidecarsClient = await get_sidecars_client(self.app, node_uuid)

started = time.time()
transferred_bytes = await sidecars_client.pull_service_input_ports(
dynamic_sidecar_endpoint, port_keys
)
duration = time.time() - started

get_instrumentation(
self.app
).dynamic_sidecar_metrics.input_ports_pull_seconds.labels(
**get_label_from_size(transferred_bytes)
).observe(
duration
)

if scheduler_data.restart_policy == RestartPolicy.ON_INPUTS_DOWNLOADED:
logger.info("Will restart containers")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ._setup import get_instrumentation, setup
from ._utils import get_label_from_size, get_start_stop_labels, track_duration

__all__: tuple[str, ...] = (
"get_instrumentation",
"get_label_from_size",
"get_start_stop_labels",
"setup",
"track_duration",
)
Loading