Skip to content

Commit 976bb2f

Browse files
authored
Merge branch 'master' into DOG-5997-replicate-config
2 parents 942e8a7 + 67ffc88 commit 976bb2f

File tree

6 files changed

+332
-7
lines changed

6 files changed

+332
-7
lines changed

cognite/extractorutils/unstable/configuration/models.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
from datetime import timedelta
99
from enum import Enum
1010
from pathlib import Path
11+
from time import sleep
1112
from typing import Annotated, Any, Literal, TypeVar
1213

1314
from humps import kebabize
15+
from prometheus_client import REGISTRY, start_http_server
1416
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler
1517
from pydantic_core import CoreSchema, core_schema
1618
from typing_extensions import assert_never
@@ -22,15 +24,18 @@
2224
OAuthClientCertificate,
2325
OAuthClientCredentials,
2426
)
27+
from cognite.client.data_classes import Asset
2528
from cognite.extractorutils.configtools._util import _load_certificate_data
2629
from cognite.extractorutils.exceptions import InvalidConfigError
30+
from cognite.extractorutils.metrics import AbstractMetricsPusher, CognitePusher, PrometheusPusher
2731
from cognite.extractorutils.statestore import (
2832
AbstractStateStore,
2933
LocalStateStore,
3034
NoStateStore,
3135
RawStateStore,
3236
)
3337
from cognite.extractorutils.threading import CancellationToken
38+
from cognite.extractorutils.util import EitherId
3439

3540
__all__ = [
3641
"AuthenticationConfig",
@@ -43,6 +48,7 @@
4348
"LogFileHandlerConfig",
4449
"LogHandlerConfig",
4550
"LogLevel",
51+
"MetricsConfig",
4652
"ScheduleConfig",
4753
"TimeIntervalConfig",
4854
]
@@ -438,6 +444,176 @@ class LogConsoleHandlerConfig(ConfigModel):
438444
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]
439445

440446

447+
class EitherIdConfig(ConfigModel):
448+
"""
449+
Configuration parameter representing an ID in CDF, which can either be an external or internal ID.
450+
451+
An EitherId can only hold one ID type, not both.
452+
"""
453+
454+
id: int | None = None
455+
external_id: str | None = None
456+
457+
@property
458+
def either_id(self) -> EitherId:
459+
"""
460+
Returns an EitherId object based on the current configuration.
461+
462+
Raises:
463+
TypeError: If both id and external_id are None, or if both are set.
464+
"""
465+
return EitherId(id=self.id, external_id=self.external_id)
466+
467+
468+
class _PushGatewayConfig(ConfigModel):
469+
"""
470+
Configuration for pushing metrics to a Prometheus Push Gateway.
471+
"""
472+
473+
host: str
474+
job_name: str
475+
username: str | None = None
476+
password: str | None = None
477+
478+
clear_after: TimeIntervalConfig | None = None
479+
push_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
480+
481+
482+
class _PromServerConfig(ConfigModel):
483+
"""
484+
Configuration for pushing metrics to a Prometheus server.
485+
"""
486+
487+
port: int = 9000
488+
host: str = "0.0.0.0"
489+
490+
491+
class _CogniteMetricsConfig(ConfigModel):
492+
"""
493+
Configuration for pushing metrics to Cognite Data Fusion.
494+
"""
495+
496+
external_id_prefix: str
497+
asset_name: str | None = None
498+
asset_external_id: str | None = None
499+
data_set: EitherIdConfig | None = None
500+
501+
push_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))
502+
503+
504+
class MetricsPushManager:
505+
"""
506+
Manages the pushing of metrics to various backends.
507+
508+
Starts and stops pushers based on a given configuration.
509+
510+
Args:
511+
metrics_config: Configuration for the metrics to be pushed.
512+
cdf_client: The CDF tenant to upload time series to
513+
cancellation_token: Event object to be used as a thread cancelation event
514+
"""
515+
516+
def __init__(
517+
self,
518+
metrics_config: "MetricsConfig",
519+
cdf_client: CogniteClient,
520+
cancellation_token: CancellationToken | None = None,
521+
) -> None:
522+
"""
523+
Initialize the MetricsPushManager.
524+
"""
525+
self.metrics_config = metrics_config
526+
self.cdf_client = cdf_client
527+
self.cancellation_token = cancellation_token
528+
self.pushers: list[AbstractMetricsPusher] = []
529+
self.clear_on_stop: dict[AbstractMetricsPusher, int] = {}
530+
531+
def start(self) -> None:
532+
"""
533+
Start all metric pushers.
534+
"""
535+
push_gateways = self.metrics_config.push_gateways or []
536+
for counter, push_gateway in enumerate(push_gateways):
537+
prometheus_pusher = PrometheusPusher(
538+
job_name=push_gateway.job_name,
539+
username=push_gateway.username,
540+
password=push_gateway.password,
541+
url=push_gateway.host,
542+
push_interval=push_gateway.push_interval.seconds,
543+
thread_name=f"MetricsPusher_{counter}",
544+
cancellation_token=self.cancellation_token,
545+
)
546+
prometheus_pusher.start()
547+
self.pushers.append(prometheus_pusher)
548+
if push_gateway.clear_after is not None:
549+
self.clear_on_stop[prometheus_pusher] = push_gateway.clear_after.seconds
550+
551+
if self.metrics_config.cognite:
552+
asset = None
553+
if self.metrics_config.cognite.asset_name and self.metrics_config.cognite.asset_external_id:
554+
asset = Asset(
555+
name=self.metrics_config.cognite.asset_name,
556+
external_id=self.metrics_config.cognite.asset_external_id,
557+
)
558+
cognite_pusher = CognitePusher(
559+
cdf_client=self.cdf_client,
560+
external_id_prefix=self.metrics_config.cognite.external_id_prefix,
561+
push_interval=self.metrics_config.cognite.push_interval.seconds,
562+
asset=asset,
563+
data_set=self.metrics_config.cognite.data_set.either_id
564+
if self.metrics_config.cognite.data_set
565+
else None,
566+
thread_name="CogniteMetricsPusher",
567+
cancellation_token=self.cancellation_token,
568+
)
569+
cognite_pusher.start()
570+
self.pushers.append(cognite_pusher)
571+
572+
if self.metrics_config.server:
573+
start_http_server(self.metrics_config.server.port, self.metrics_config.server.host, registry=REGISTRY)
574+
575+
def stop(self) -> None:
576+
"""
577+
Stop all metric pushers.
578+
"""
579+
for pusher in self.pushers:
580+
pusher.stop()
581+
582+
# Clear Prometheus pushers gateways if required
583+
if self.clear_on_stop:
584+
wait_time = max(self.clear_on_stop.values())
585+
sleep(wait_time)
586+
for pusher in (p for p in self.clear_on_stop if isinstance(p, PrometheusPusher)):
587+
pusher.clear_gateway()
588+
589+
590+
class MetricsConfig(ConfigModel):
591+
"""
592+
Destination(s) for metrics.
593+
594+
Including options for one or several Prometheus push gateways, and pushing as CDF Time Series.
595+
"""
596+
597+
push_gateways: list[_PushGatewayConfig] | None = None
598+
cognite: _CogniteMetricsConfig | None = None
599+
server: _PromServerConfig | None = None
600+
601+
def create_manager(
602+
self, cdf_client: CogniteClient, cancellation_token: CancellationToken | None = None
603+
) -> MetricsPushManager:
604+
"""
605+
Create a MetricsPushManager based on the current configuration.
606+
607+
Args:
608+
cdf_client: An instance of CogniteClient to interact with CDF.
609+
cancellation_token: Optional token to signal cancellation of metric pushing.
610+
611+
Returns:
612+
MetricsPushManager: An instance of MetricsPushManager configured with the provided parameters.
613+
"""
614+
return MetricsPushManager(self, cdf_client, cancellation_token)
615+
616+
441617
# Mypy BS
442618
def _log_handler_default() -> list[LogHandlerConfig]:
443619
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]

cognite/extractorutils/unstable/core/base.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
5959
from typing_extensions import Self, assert_never
6060

6161
from cognite.extractorutils._inner_util import _resolve_log_level
62+
from cognite.extractorutils.metrics import BaseMetrics
6263
from cognite.extractorutils.statestore import (
6364
AbstractStateStore,
6465
LocalStateStore,
@@ -72,6 +73,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
7273
ExtractorConfig,
7374
LogConsoleHandlerConfig,
7475
LogFileHandlerConfig,
76+
MetricsConfig,
7577
)
7678
from cognite.extractorutils.unstable.core._dto import (
7779
CogniteModel,
@@ -116,11 +118,13 @@ def __init__(
116118
application_config: _T,
117119
current_config_revision: ConfigRevision,
118120
log_level_override: str | None = None,
121+
metrics_config: MetricsConfig | None = None,
119122
) -> None:
120123
self.connection_config = connection_config
121124
self.application_config = application_config
122125
self.current_config_revision: ConfigRevision = current_config_revision
123126
self.log_level_override = log_level_override
127+
self.metrics_config = metrics_config
124128

125129

126130
class Extractor(Generic[ConfigType], CogniteLogger):
@@ -147,7 +151,9 @@ class Extractor(Generic[ConfigType], CogniteLogger):
147151

148152
cancellation_token: CancellationToken
149153

150-
def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None:
154+
def __init__(
155+
self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics | None = None
156+
) -> None:
151157
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")
152158
self._checkin_worker = checkin_worker
153159

@@ -156,6 +162,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker
156162

157163
self.connection_config = config.connection_config
158164
self.application_config = config.application_config
165+
self.metrics_config = config.metrics_config
159166
self.current_config_revision: ConfigRevision = config.current_config_revision
160167
self.log_level_override = config.log_level_override
161168

@@ -170,6 +177,13 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker
170177

171178
self._tasks: list[Task] = []
172179
self._start_time: datetime
180+
self._metrics: BaseMetrics | None = metrics
181+
182+
self.metrics_push_manager = (
183+
self.metrics_config.create_manager(self.cognite_client, cancellation_token=self.cancellation_token)
184+
if self.metrics_config
185+
else None
186+
)
173187

174188
self.__init_tasks__()
175189

@@ -367,8 +381,10 @@ def restart(self) -> None:
367381
self.cancellation_token.cancel()
368382

369383
@classmethod
370-
def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self:
371-
return cls(config, checkin_worker)
384+
def _init_from_runtime(
385+
cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics
386+
) -> Self:
387+
return cls(config, checkin_worker, metrics)
372388

373389
def add_task(self, task: Task) -> None:
374390
"""
@@ -438,6 +454,8 @@ def start(self) -> None:
438454
self.state_store.start()
439455

440456
Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()
457+
if self.metrics_push_manager:
458+
self.metrics_push_manager.start()
441459

442460
def stop(self) -> None:
443461
"""
@@ -446,6 +464,8 @@ def stop(self) -> None:
446464
Instead of calling this method directly, it is recommended to use the context manager interface by using the
447465
``with`` statement, which ensures proper cleanup on exit.
448466
"""
467+
if self.metrics_push_manager:
468+
self.metrics_push_manager.stop()
449469
self.cancellation_token.cancel()
450470

451471
def __enter__(self) -> Self:

cognite/extractorutils/unstable/core/runtime.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def main() -> None:
4747
CogniteAuthError,
4848
CogniteConnectionError,
4949
)
50+
from cognite.extractorutils.metrics import BaseMetrics
5051
from cognite.extractorutils.threading import CancellationToken
5152
from cognite.extractorutils.unstable.configuration.exceptions import InvalidArgumentError, InvalidConfigError
5253
from cognite.extractorutils.unstable.configuration.loaders import (
@@ -78,14 +79,17 @@ def _extractor_process_entrypoint(
7879
controls: _RuntimeControls,
7980
config: FullConfig,
8081
checkin_worker: CheckinWorker,
82+
metrics: BaseMetrics | None = None,
8183
) -> None:
8284
logger = logging.getLogger(f"{extractor_class.EXTERNAL_ID}.runtime")
8385
checkin_worker.active_revision = config.current_config_revision
8486
checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls))
8587
checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls))
8688
if config.application_config.retry_startup:
8789
checkin_worker.set_retry_startup(config.application_config.retry_startup)
88-
extractor = extractor_class._init_from_runtime(config, checkin_worker)
90+
if not metrics:
91+
metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION)
92+
extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics)
8993
extractor._attach_runtime_controls(
9094
cancel_event=controls.cancel_event,
9195
message_queue=controls.message_queue,
@@ -135,11 +139,13 @@ class Runtime(Generic[ExtractorType]):
135139
def __init__(
136140
self,
137141
extractor: type[ExtractorType],
142+
metrics: BaseMetrics | None = None,
138143
) -> None:
139144
self._extractor_class = extractor
140145
self._cancellation_token = CancellationToken()
141146
self._cancellation_token.cancel_on_interrupt()
142147
self._message_queue: Queue[RuntimeMessage] = Queue()
148+
self._metrics = metrics
143149
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
144150
self._setup_logging()
145151
self._cancel_event: MpEvent | None = None
@@ -268,7 +274,7 @@ def _spawn_extractor(
268274

269275
process = Process(
270276
target=_extractor_process_entrypoint,
271-
args=(self._extractor_class, controls, config, checkin_worker),
277+
args=(self._extractor_class, controls, config, checkin_worker, self._metrics),
272278
)
273279

274280
process.start()

tests/test_unstable/conftest.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import gzip
22
import json
33
import os
4+
from collections import Counter
45
from collections.abc import Callable, Generator, Iterator
56
from threading import RLock
67
from time import sleep, time
@@ -13,6 +14,7 @@
1314
from cognite.client import CogniteClient
1415
from cognite.client.config import ClientConfig
1516
from cognite.client.credentials import OAuthClientCredentials
17+
from cognite.extractorutils.metrics import BaseMetrics
1618
from cognite.extractorutils.unstable.configuration.models import (
1719
ConnectionConfig,
1820
ExtractorConfig,
@@ -213,3 +215,10 @@ def log_messages_task(ctx: TaskContext) -> None:
213215
ctx.warning("This is a warning message.")
214216

215217
self.add_task(StartupTask(name="log_task", target=log_messages_task))
218+
219+
220+
class TestMetrics(BaseMetrics):
221+
def __init__(self) -> None:
222+
super().__init__(extractor_name=TestExtractor.NAME, extractor_version=TestExtractor.VERSION)
223+
224+
self.a_counter = Counter("my_extractor_example_counter", "An example counter")

0 commit comments

Comments
 (0)