|
| 1 | +from __future__ import annotations |
| 2 | +import contextlib |
| 3 | +import dataclasses |
| 4 | +import json |
| 5 | +import typing |
| 6 | + |
| 7 | +from lite_bootstrap.bootstrappers.base import BaseBootstrapper |
| 8 | +from lite_bootstrap.instruments.healthchecks_instrument import HealthChecksConfig, HealthChecksInstrument |
| 9 | +from lite_bootstrap.instruments.logging_instrument import LoggingConfig, LoggingInstrument |
| 10 | +from lite_bootstrap.instruments.opentelemetry_instrument import OpentelemetryConfig, OpenTelemetryInstrument |
| 11 | +from lite_bootstrap.instruments.prometheus_instrument import PrometheusConfig, PrometheusInstrument |
| 12 | +from lite_bootstrap.instruments.sentry_instrument import SentryConfig, SentryInstrument |
| 13 | + |
| 14 | + |
| 15 | +with contextlib.suppress(ImportError): |
| 16 | + import faststream |
| 17 | + import prometheus_client |
| 18 | + from faststream.asgi import AsgiFastStream, AsgiResponse |
| 19 | + from faststream.asgi import get as handle_get |
| 20 | + from faststream.broker.core.usecase import BrokerUsecase |
| 21 | + from opentelemetry.metrics import Meter, MeterProvider |
| 22 | + from opentelemetry.trace import TracerProvider, get_tracer_provider |
| 23 | + |
| 24 | + |
| 25 | +@typing.runtime_checkable |
| 26 | +class FastStreamTelemetryMiddlewareProtocol(typing.Protocol): |
| 27 | + def __init__( |
| 28 | + self, |
| 29 | + *, |
| 30 | + tracer_provider: TracerProvider | None = None, |
| 31 | + meter_provider: MeterProvider | None = None, |
| 32 | + meter: Meter | None = None, |
| 33 | + ) -> None: ... |
| 34 | + def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401 |
| 35 | + |
| 36 | + |
| 37 | +@typing.runtime_checkable |
| 38 | +class FastStreamPrometheusMiddlewareProtocol(typing.Protocol): |
| 39 | + def __init__( |
| 40 | + self, |
| 41 | + *, |
| 42 | + registry: prometheus_client.CollectorRegistry, |
| 43 | + app_name: str = ..., |
| 44 | + metrics_prefix: str = "faststream", |
| 45 | + received_messages_size_buckets: typing.Sequence[float] | None = None, |
| 46 | + ) -> None: ... |
| 47 | + def __call__(self, msg: typing.Any | None) -> faststream.BaseMiddleware: ... # noqa: ANN401 |
| 48 | + |
| 49 | + |
| 50 | +@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) |
| 51 | +class FastStreamConfig(HealthChecksConfig, LoggingConfig, OpentelemetryConfig, PrometheusConfig, SentryConfig): |
| 52 | + application: AsgiFastStream = dataclasses.field(default_factory=AsgiFastStream) |
| 53 | + broker: BrokerUsecase[typing.Any, typing.Any] | None = None |
| 54 | + opentelemetry_middleware_cls: type[FastStreamTelemetryMiddlewareProtocol] | None = None |
| 55 | + prometheus_middleware_cls: type[FastStreamPrometheusMiddlewareProtocol] | None = None |
| 56 | + |
| 57 | + |
| 58 | +@dataclasses.dataclass(kw_only=True, slots=True, frozen=True) |
| 59 | +class FastStreamHealthChecksInstrument(HealthChecksInstrument): |
| 60 | + bootstrap_config: FastStreamConfig |
| 61 | + |
| 62 | + def bootstrap(self) -> None: |
| 63 | + @handle_get |
| 64 | + async def check_health(_: object) -> AsgiResponse: |
| 65 | + return ( |
| 66 | + AsgiResponse( |
| 67 | + json.dumps(self.render_health_check_data()).encode(), 200, headers={"content-type": "text/plain"} |
| 68 | + ) |
| 69 | + if await self._define_health_status() |
| 70 | + else AsgiResponse(b"Service is unhealthy", 500, headers={"content-type": "application/json"}) |
| 71 | + ) |
| 72 | + |
| 73 | + self.bootstrap_config.application.mount(self.bootstrap_config.health_checks_path, check_health) |
| 74 | + |
| 75 | + async def _define_health_status(self) -> bool: |
| 76 | + if not self.bootstrap_config.application or not self.bootstrap_config.application.broker: |
| 77 | + return False |
| 78 | + |
| 79 | + return await self.bootstrap_config.application.broker.ping(timeout=5) |
| 80 | + |
| 81 | + |
| 82 | +@dataclasses.dataclass(kw_only=True, frozen=True) |
| 83 | +class FastStreamLoggingInstrument(LoggingInstrument): |
| 84 | + bootstrap_config: FastStreamConfig |
| 85 | + |
| 86 | + |
| 87 | +@dataclasses.dataclass(kw_only=True, frozen=True) |
| 88 | +class FastStreamOpenTelemetryInstrument(OpenTelemetryInstrument): |
| 89 | + bootstrap_config: FastStreamConfig |
| 90 | + |
| 91 | + def is_ready(self) -> bool: |
| 92 | + return bool(self.bootstrap_config.opentelemetry_middleware_cls and super().is_ready()) |
| 93 | + |
| 94 | + def bootstrap(self) -> None: |
| 95 | + if self.bootstrap_config.opentelemetry_middleware_cls and self.bootstrap_config.application.broker: |
| 96 | + self.bootstrap_config.application.broker.add_middleware( |
| 97 | + self.bootstrap_config.opentelemetry_middleware_cls(tracer_provider=get_tracer_provider()) |
| 98 | + ) |
| 99 | + |
| 100 | + |
| 101 | +@dataclasses.dataclass(kw_only=True, frozen=True) |
| 102 | +class FastStreamSentryInstrument(SentryInstrument): |
| 103 | + bootstrap_config: FastStreamConfig |
| 104 | + |
| 105 | + |
| 106 | +@dataclasses.dataclass(kw_only=True, frozen=True) |
| 107 | +class FastStreamPrometheusInstrument(PrometheusInstrument): |
| 108 | + bootstrap_config: FastStreamConfig |
| 109 | + collector_registry: prometheus_client.CollectorRegistry = dataclasses.field( |
| 110 | + default_factory=prometheus_client.CollectorRegistry, init=False |
| 111 | + ) |
| 112 | + |
| 113 | + def is_ready(self) -> bool: |
| 114 | + return bool(self.bootstrap_config.prometheus_middleware_cls and super().is_ready()) |
| 115 | + |
| 116 | + def bootstrap(self) -> None: |
| 117 | + self.bootstrap_config.application.mount( |
| 118 | + self.bootstrap_config.prometheus_metrics_path, prometheus_client.make_asgi_app(self.collector_registry) |
| 119 | + ) |
| 120 | + if self.bootstrap_config.prometheus_middleware_cls and self.bootstrap_config.application.broker: |
| 121 | + self.bootstrap_config.application.broker.add_middleware( |
| 122 | + self.bootstrap_config.prometheus_middleware_cls(registry=self.collector_registry) |
| 123 | + ) |
| 124 | + |
| 125 | + |
| 126 | +class FastStreamBootstrapper(BaseBootstrapper[AsgiFastStream]): |
| 127 | + instruments_types: typing.ClassVar = [ |
| 128 | + FastStreamOpenTelemetryInstrument, |
| 129 | + FastStreamSentryInstrument, |
| 130 | + FastStreamHealthChecksInstrument, |
| 131 | + FastStreamLoggingInstrument, |
| 132 | + FastStreamPrometheusInstrument, |
| 133 | + ] |
| 134 | + bootstrap_config: FastStreamConfig |
| 135 | + __slots__ = "bootstrap_config", "instruments" |
| 136 | + |
| 137 | + def __init__(self, bootstrap_config: FastStreamConfig) -> None: |
| 138 | + super().__init__(bootstrap_config) |
| 139 | + if self.bootstrap_config.broker: |
| 140 | + self.bootstrap_config.application.broker = self.bootstrap_config.broker |
| 141 | + |
| 142 | + def _prepare_application(self) -> AsgiFastStream: |
| 143 | + return self.bootstrap_config.application |
0 commit comments