Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cognite/extractorutils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,6 @@ def stop(self) -> None:
self._push_to_server()
self.upload_queue.stop()
self.cancellation_token.cancel()


MetricsType = TypeVar("MetricsType", bound=BaseMetrics)
37 changes: 28 additions & 9 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class MyConfig(ExtractorConfig):
another_parameter: int
schedule: ScheduleConfig

class MyMetrics(BaseMetrics):
def __init__(self, extractor_name: str, extractor_version: str):
super().__init__(extractor_name, extractor_version)
self.custom_counter = Counter("custom_counter", "A custom counter")

class MyExtractor(Extractor[MyConfig]):
NAME = "My Extractor"
EXTERNAL_ID = "my-extractor"
Expand All @@ -30,6 +35,9 @@ class MyExtractor(Extractor[MyConfig]):

CONFIG_TYPE = MyConfig

# Override metrics type annotation for IDE support
metrics: MyMetrics

def __init_tasks__(self) -> None:
self.add_task(
ScheduledTask(
Expand All @@ -42,6 +50,8 @@ def __init_tasks__(self) -> None:

def my_task_function(self, task_context: TaskContext) -> None:
task_context.logger.info("Running my task")
# IDE will now autocomplete custom_counter
self.metrics.custom_counter.inc()
"""

import logging
Expand All @@ -59,7 +69,7 @@ def my_task_function(self, task_context: TaskContext) -> None:
from typing_extensions import Self, assert_never

from cognite.extractorutils._inner_util import _resolve_log_level
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.metrics import BaseMetrics, MetricsType, safe_get
from cognite.extractorutils.statestore import (
AbstractStateStore,
LocalStateStore,
Expand Down Expand Up @@ -117,11 +127,13 @@ def __init__(
application_config: _T,
current_config_revision: ConfigRevision,
log_level_override: str | None = None,
metrics_class: type[MetricsType] | None = None,
) -> None:
self.connection_config = connection_config
self.application_config = application_config
self.current_config_revision: ConfigRevision = current_config_revision
self.log_level_override = log_level_override
self.metrics_class: type[MetricsType] | None = metrics_class


class Extractor(Generic[ConfigType], CogniteLogger):
Expand Down Expand Up @@ -149,9 +161,7 @@ class Extractor(Generic[ConfigType], CogniteLogger):

cancellation_token: CancellationToken

def __init__(
self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics | None = None
) -> None:
def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> None:
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")
self._checkin_worker = checkin_worker

Expand All @@ -175,7 +185,8 @@ def __init__(

self._tasks: list[Task] = []
self._start_time: datetime
self._metrics: BaseMetrics | None = metrics

self.metrics: BaseMetrics = self._load_metrics(config.metrics_class)

self.metrics_push_manager = (
self.metrics_config.create_manager(self.cognite_client, cancellation_token=self.cancellation_token)
Expand Down Expand Up @@ -262,6 +273,16 @@ def _setup_logging(self) -> None:
"Defaulted to console logging."
)

def _load_metrics(self, metrics_class: type[MetricsType] | None = None) -> MetricsType | BaseMetrics:
"""
Loads metrics based on the provided metrics class.

Reuses existing singleton if available to avoid Prometheus registry conflicts.
"""
if metrics_class:
return safe_get(metrics_class)
return safe_get(BaseMetrics, extractor_name=self.EXTERNAL_ID, extractor_version=self.VERSION)

def _load_state_store(self) -> None:
"""
Searches through the config object for a StateStoreConfig.
Expand Down Expand Up @@ -379,10 +400,8 @@ def restart(self) -> None:
self.cancellation_token.cancel()

@classmethod
def _init_from_runtime(
cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker, metrics: BaseMetrics
) -> Self:
return cls(config, checkin_worker, metrics)
def _init_from_runtime(cls, config: FullConfig[ConfigType], checkin_worker: CheckinWorker) -> Self:
return cls(config, checkin_worker)

def add_task(self, task: Task) -> None:
"""
Expand Down
22 changes: 14 additions & 8 deletions cognite/extractorutils/unstable/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def main() -> None:
CogniteAuthError,
CogniteConnectionError,
)
from cognite.extractorutils.metrics import BaseMetrics
from cognite.extractorutils.metrics import BaseMetrics, MetricsType
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.exceptions import InvalidArgumentError, InvalidConfigError
from cognite.extractorutils.unstable.configuration.loaders import (
Expand Down Expand Up @@ -79,16 +79,13 @@ def _extractor_process_entrypoint(
controls: _RuntimeControls,
config: FullConfig,
checkin_worker: CheckinWorker,
metrics: BaseMetrics | None = None,
) -> None:
logger = logging.getLogger(f"{extractor_class.EXTERNAL_ID}.runtime")
checkin_worker.active_revision = config.current_config_revision
checkin_worker.set_on_fatal_error_handler(lambda _: on_fatal_error(controls))
checkin_worker.set_on_revision_change_handler(lambda _: on_revision_changed(controls))
checkin_worker.set_retry_startup(extractor_class.RETRY_STARTUP)
if not metrics:
metrics = BaseMetrics(extractor_name=extractor_class.NAME, extractor_version=extractor_class.VERSION)
extractor = extractor_class._init_from_runtime(config, checkin_worker, metrics)
extractor = extractor_class._init_from_runtime(config, checkin_worker)
extractor._attach_runtime_controls(
cancel_event=controls.cancel_event,
message_queue=controls.message_queue,
Expand Down Expand Up @@ -138,13 +135,13 @@ class Runtime(Generic[ExtractorType]):
def __init__(
self,
extractor: type[ExtractorType],
metrics: BaseMetrics | None = None,
metrics: type[MetricsType] | None = None,
) -> None:
self._extractor_class = extractor
self._cancellation_token = CancellationToken()
self._cancellation_token.cancel_on_interrupt()
self._message_queue: Queue[RuntimeMessage] = Queue()
self._metrics = metrics
self._metrics_class = metrics
self.logger = logging.getLogger(f"{self._extractor_class.EXTERNAL_ID}.runtime")
self._setup_logging()
self._cancel_event: MpEvent | None = None
Expand Down Expand Up @@ -273,7 +270,7 @@ def _spawn_extractor(

process = Process(
target=_extractor_process_entrypoint,
args=(self._extractor_class, controls, config, checkin_worker, self._metrics),
args=(self._extractor_class, controls, config, checkin_worker),
)

process.start()
Expand Down Expand Up @@ -477,6 +474,14 @@ def _main_runtime(self, args: Namespace) -> None:
if not args.skip_init_checks and not self._verify_connection_config(connection_config):
sys.exit(1)

if self._metrics_class is not None and (
not isinstance(self._metrics_class, type) or not issubclass(self._metrics_class, BaseMetrics)
):
self.logger.critical(
"The provided metrics class does not inherit from BaseMetrics. Metrics will not be collected."
)
sys.exit(1)

# This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't
# exist yet, and I have not found a way to represent it in a generic way that isn't just an Any in disguise.
application_config: Any
Expand Down Expand Up @@ -507,6 +512,7 @@ def _main_runtime(self, args: Namespace) -> None:
application_config=application_config,
current_config_revision=current_config_revision,
log_level_override=args.log_level,
metrics_class=self._metrics_class,
),
checkin_worker,
)
Expand Down
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,47 @@
from enum import Enum

import pytest
from prometheus_client.core import REGISTRY

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.extractorutils import metrics

NUM_NODES = 5000
NUM_EDGES = NUM_NODES // 100


@pytest.fixture(autouse=True)
def reset_singleton() -> Generator[None, None, None]:
"""
This fixture ensures that the _metrics_singularities
class variables are reset, and Prometheus collectors are unregistered,
providing test isolation.
"""
# Clean up before test
metrics._metrics_singularities.clear()

# Unregister all collectors to prevent "Duplicated timeseries" errors
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
with contextlib.suppress(Exception):
REGISTRY.unregister(collector)

yield

# Clean up after test
metrics._metrics_singularities.clear()

# Unregister all collectors again
collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
with contextlib.suppress(Exception):
REGISTRY.unregister(collector)


class ETestType(Enum):
TIME_SERIES = "time_series"
CDM_TIME_SERIES = "cdm_time_series"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_unstable/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import gzip
import json
import os
from collections import Counter
from collections.abc import Callable, Generator, Iterator
from threading import RLock
from time import sleep, time
Expand All @@ -10,6 +9,7 @@

import pytest
import requests_mock
from prometheus_client.core import Counter

from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
Expand Down
5 changes: 3 additions & 2 deletions tests/test_unstable/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ def counting_push(self: CognitePusher) -> None:
application_config=app_config,
current_config_revision=1,
log_level_override=override_level,
metrics_class=TestMetrics,
)
worker = get_checkin_worker(connection_config)
extractor = TestExtractor(full_config, worker, metrics=TestMetrics)
assert isinstance(extractor._metrics, TestMetrics) or extractor._metrics == TestMetrics
extractor = TestExtractor(full_config, worker)
assert isinstance(extractor.metrics, TestMetrics)

with contextlib.ExitStack() as stack:
stack.enter_context(contextlib.suppress(Exception))
Expand Down
Loading
Loading