Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from datetime import timedelta
from enum import Enum
from pathlib import Path
from time import sleep
from typing import Annotated, Any, Literal, TypeVar

from humps import kebabize
from prometheus_client import REGISTRY, start_http_server
from pydantic import BaseModel, ConfigDict, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from typing_extensions import assert_never
Expand All @@ -22,15 +24,18 @@
OAuthClientCertificate,
OAuthClientCredentials,
)
from cognite.client.data_classes import Asset
from cognite.extractorutils.configtools._util import _load_certificate_data
from cognite.extractorutils.exceptions import InvalidConfigError
from cognite.extractorutils.metrics import AbstractMetricsPusher, CognitePusher, PrometheusPusher
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
NoStateStore,
RawStateStore,
)
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.util import EitherId

__all__ = [
"AuthenticationConfig",
Expand All @@ -43,6 +48,7 @@
"LogFileHandlerConfig",
"LogHandlerConfig",
"LogLevel",
"MetricsConfig",
"ScheduleConfig",
"TimeIntervalConfig",
]
Expand Down Expand Up @@ -429,6 +435,176 @@ class LogConsoleHandlerConfig(ConfigModel):
LogHandlerConfig = Annotated[LogFileHandlerConfig | LogConsoleHandlerConfig, Field(discriminator="type")]


class EitherIdConfig(ConfigModel):
"""
Configuration parameter representing an ID in CDF, which can either be an external or internal ID.

An EitherId can only hold one ID type, not both.
"""

id: int | None = None
external_id: str | None = None

@property
def either_id(self) -> EitherId:
"""
Returns an EitherId object based on the current configuration.

Raises:
TypeError: If both id and external_id are None, or if both are set.
"""
return EitherId(id=self.id, external_id=self.external_id)


class _PushGatewayConfig(ConfigModel):
"""
Configuration for pushing metrics to a Prometheus Push Gateway.
"""

host: str
job_name: str
username: str | None = None
password: str | None = None

clear_after: TimeIntervalConfig | None
push_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class _PromServerConfig(ConfigModel):
"""
Configuration for pushing metrics to a Prometheus server.
"""

port: int = 9000
host: str = "0.0.0.0"


class _CogniteMetricsConfig(ConfigModel):
"""
Configuration for pushing metrics to Cognite Data Fusion.
"""

external_id_prefix: str
asset_name: str | None = None
asset_external_id: str | None = None
data_set: EitherIdConfig | None = None

push_interval: TimeIntervalConfig = Field(default_factory=lambda: TimeIntervalConfig("30s"))


class MetricsPushManager:
"""
Manages the pushing of metrics to various backends.

Starts and stops pushers based on a given configuration.

Args:
metrics_config: Configuration for the metrics to be pushed.
cdf_client: The CDF tenant to upload time series to
cancellation_token: Event object to be used as a thread cancelation event
"""

def __init__(
self,
metrics_config: "MetricsConfig",
cdf_client: CogniteClient,
cancellation_token: CancellationToken | None = None,
) -> None:
"""
Initialize the MetricsPushManager.
"""
self.metrics_config = metrics_config
self.cdf_client = cdf_client
self.cancellation_token = cancellation_token
self.pushers: list[AbstractMetricsPusher] = []
self.clear_on_stop: dict[AbstractMetricsPusher, int] = {}

def start(self) -> None:
"""
Start all metric pushers.
"""
push_gateways = self.metrics_config.push_gateways or []
for counter, push_gateway in enumerate(push_gateways):
prometheus_pusher = PrometheusPusher(
job_name=push_gateway.job_name,
username=push_gateway.username,
password=push_gateway.password,
url=push_gateway.host,
push_interval=push_gateway.push_interval.seconds,
thread_name=f"MetricsPusher_{counter}",
cancellation_token=self.cancellation_token,
)
prometheus_pusher.start()
self.pushers.append(prometheus_pusher)
if push_gateway.clear_after is not None:
self.clear_on_stop[prometheus_pusher] = push_gateway.clear_after.seconds

if self.metrics_config.cognite:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add testing for this option too

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests

asset = None
if self.metrics_config.cognite.asset_name and self.metrics_config.cognite.asset_external_id:
asset = Asset(
name=self.metrics_config.cognite.asset_name,
external_id=self.metrics_config.cognite.asset_external_id,
)
cognite_pusher = CognitePusher(
cdf_client=self.cdf_client,
external_id_prefix=self.metrics_config.cognite.external_id_prefix,
push_interval=self.metrics_config.cognite.push_interval.seconds,
asset=asset,
data_set=self.metrics_config.cognite.data_set.either_id
if self.metrics_config.cognite.data_set
else None,
thread_name="CogniteMetricsPusher",
cancellation_token=self.cancellation_token,
)
cognite_pusher.start()
self.pushers.append(cognite_pusher)

if self.metrics_config.server:
start_http_server(self.metrics_config.server.port, self.metrics_config.server.host, registry=REGISTRY)

def stop(self) -> None:
"""
Stop all metric pushers.
"""
for pusher in self.pushers:
pusher.stop()

# Clear Prometheus pushers gateways if required
if self.clear_on_stop:
wait_time = max(self.clear_on_stop.values())
sleep(wait_time)
for pusher in (p for p in self.clear_on_stop if isinstance(p, PrometheusPusher)):
pusher.clear_gateway()


class MetricsConfig(ConfigModel):
"""
Destination(s) for metrics.

Including options for one or several Prometheus push gateways, and pushing as CDF Time Series.
"""

push_gateways: list[_PushGatewayConfig] | None
cognite: _CogniteMetricsConfig | None
server: _PromServerConfig | None

def create_manager(
self, cdf_client: CogniteClient, cancellation_token: CancellationToken | None = None
) -> MetricsPushManager:
"""
Create a MetricsPushManager based on the current configuration.

Args:
cdf_client: An instance of CogniteClient to interact with CDF.
cancellation_token: Optional token to signal cancellation of metric pushing.

Returns:
MetricsPushManager: An instance of MetricsPushManager configured with the provided parameters.
"""
return MetricsPushManager(self, cdf_client, cancellation_token)


# Mypy BS
def _log_handler_default() -> list[LogHandlerConfig]:
return [LogConsoleHandlerConfig(type="console", level=LogLevel.INFO)]
Expand Down
26 changes: 23 additions & 3 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
Expand All @@ -72,6 +73,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
ExtractorConfig,
LogConsoleHandlerConfig,
LogFileHandlerConfig,
MetricsConfig,
)
from cognite.extractorutils.unstable.core._dto import (
CogniteModel,
Expand Down Expand Up @@ -116,11 +118,13 @@ def __init__(
application_config: _T,
current_config_revision: ConfigRevision,
log_level_override: str | None = None,
metrics_config: MetricsConfig | None = None,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision: ConfigRevision = current_config_revision
self.log_level_override = log_level_override
self.metrics_config = metrics_config


class Extractor(Generic[ConfigType], CogniteLogger):
Expand All @@ -147,7 +151,9 @@ class Extractor(Generic[ConfigType], CogniteLogger):

cancellation_token: CancellationToken

def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None:
def __init__(
self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics | None = None
) -> None:
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")
self._checkin_worker = checkin_worker

Expand All @@ -156,6 +162,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker

self.connection_config = config.connection_config
self.application_config = config.application_config
self.metrics_config = config.metrics_config
self.current_config_revision: ConfigRevision = config.current_config_revision
self.log_level_override = config.log_level_override

Expand All @@ -170,6 +177,13 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker

self._tasks: list[Task] = []
self._start_time: datetime
self._metrics: BaseMetrics | None = metrics

self.metrics_push_manager = (
self.metrics_config.create_manager(self.cognite_client, cancellation_token=self.cancellation_token)
if self.metrics_config
else None
)

self.__init_tasks__()

Expand Down Expand Up @@ -371,8 +385,10 @@ def restart(self) -> None:
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self:
return cls(config, checkin_worker)
def _init_from_runtime(
cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics
) -> Self:
return cls(config, checkin_worker, metrics)

def add_task(self, task: Task) -> None:
"""
Expand Down Expand Up @@ -442,6 +458,8 @@ def start(self) -> None:
self.state_store.start()

Thread(target=self._run_checkin, name="ExtractorCheckin", daemon=True).start()
if self.metrics_push_manager:
self.metrics_push_manager.start()

def stop(self) -> None:
"""
Expand All @@ -450,6 +468,8 @@ def stop(self) -> None:
Instead of calling this method directly, it is recommended to use the context manager interface by using the
``with`` statement, which ensures proper cleanup on exit.
"""
if self.metrics_push_manager:
self.metrics_push_manager.stop()
self.cancellation_token.cancel()

def __enter__(self) -> Self:
Expand Down
10 changes: 8 additions & 2 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def main() -> None:
CogniteAuthError,
CogniteConnectionError,
)
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.exceptions import InvalidArgumentError, InvalidConfigError
from cognite.extractorutils.unstable.configuration.loaders import (
Expand Down Expand Up @@ -78,14 +79,17 @@ def _extractor_process_entrypoint(
controls: _RuntimeControls,
config: FullConfig,
checkin_worker: CheckinWorker,
metrics: BaseMetrics | None = None,
) -> None:
logger = logging.getLogger(f"{extractor_class.EXTERNAL_ID}.runtime")
checkin_worker.active_revision = config.current_config_revision
checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls))
checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls))
if config.application_config.retry_startup:
checkin_worker.set_retry_startup(config.application_config.retry_startup)
extractor = extractor_class._init_from_runtime(config, checkin_worker)
if not metrics:
metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION)
extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics)
extractor._attach_runtime_controls(
cancel_event=controls.cancel_event,
message_queue=controls.message_queue,
Expand Down Expand Up @@ -135,11 +139,13 @@ class Runtime(Generic[ExtractorType]):
def __init__(
self,
extractor: type[ExtractorType],
metrics: BaseMetrics | None = None,
) -> None:
self._extractor_class = extractor
self._cancellation_token = CancellationToken()
self._cancellation_token.cancel_on_interrupt()
self._message_queue: Queue[RuntimeMessage] = Queue()
self._metrics = metrics
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
self._setup_logging()
self._cancel_event: MpEvent | None = None
Expand Down Expand Up @@ -268,7 +274,7 @@ def _spawn_extractor(

process = Process(
target=_extractor_process_entrypoint,
args=(self._extractor_class, controls, config, checkin_worker),
args=(self._extractor_class, controls, config, checkin_worker, self._metrics),
)

process.start()
Expand Down
9 changes: 9 additions & 0 deletions tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gzip
import json
import os
from collections import Counter
from collections.abc import Callable, Generator, Iterator
from threading import RLock
from time import sleep, time
Expand All @@ -13,6 +14,7 @@
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.unstable.configuration.models import (
ConnectionConfig,
ExtractorConfig,
Expand Down Expand Up @@ -213,3 +215,10 @@ def log_messages_task(ctx: TaskContext) -> None:
ctx.warning("This is a warning message.")

self.add_task(StartupTask(name="log_task", target=log_messages_task))


class TestMetrics(BaseMetrics):
def __init__(self) -> None:
super().__init__(extractor_name=TestExtractor.NAME, extractor_version=TestExtractor.VERSION)

self.a_counter = Counter("my_extractor_example_counter", "An example counter")
Loading
Loading