Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions cognite/extractorutils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ def __init__(self):
from prometheus_client.exposition import basic_auth_handler, delete_from_gateway, pushadd_to_gateway

from cognite.client import CogniteClient
from cognite.client.data_classes import Asset, Datapoints, DatapointsArray, TimeSeries
from cognite.client.data_classes.data_modeling import NodeId
from cognite.client.data_classes import Asset, TimeSeries
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.uploader.time_series import DataPointList, TimeSeriesUploadQueue
from cognite.extractorutils.util import EitherId

from .util import ensure_time_series

_metrics_singularities = {}


Expand Down Expand Up @@ -359,70 +357,104 @@ def __init__(
self.asset = asset
self.external_id_prefix = external_id_prefix
self.data_set = data_set
self._asset_id: int | None = None
self._data_set_id: int | None = None

self._init_cdf()

self.upload_queue = TimeSeriesUploadQueue(
cdf_client=cdf_client,
create_missing=self._create_missing_timeseries_factory,
data_set_id=self._data_set_id,
cancellation_token=cancellation_token,
)

self._cdf_project = cdf_client.config.project

def _init_cdf(self) -> None:
"""
Initialize the CDF tenant with the necessary time series and asset.
"""
time_series: list[TimeSeries] = []
Initialize the CDF tenant with the necessary asset and dataset.

Timeseries are created automatically by TimeSeriesUploadQueue when datapoints are pushed.
"""
if self.asset is not None:
# Ensure that asset exist, and retrieve internal ID
# Ensure that asset exists, and retrieve internal ID
asset: Asset | None
try:
asset = self.cdf_client.assets.create(self.asset)
except CogniteDuplicatedError:
asset = self.cdf_client.assets.retrieve(external_id=self.asset.external_id)

asset_id = asset.id if asset is not None else None

else:
asset_id = None
self._asset_id = asset.id if asset is not None else None

data_set_id = None
if self.data_set:
dataset = self.cdf_client.data_sets.retrieve(
id=self.data_set.internal_id, external_id=self.data_set.external_id
)
if dataset:
data_set_id = dataset.id
self._data_set_id = dataset.id

for metric in REGISTRY.collect():
if type(metric) is Metric and metric.type in ["gauge", "counter"]:
external_id = self.external_id_prefix + metric.name
def _create_missing_timeseries_factory(self, external_id: str, datapoints: DataPointList) -> TimeSeries:
"""
Factory function to create missing timeseries.

time_series.append(
TimeSeries(
external_id=external_id,
name=metric.name,
legacy_name=external_id,
description=metric.documentation,
asset_id=asset_id,
data_set_id=data_set_id,
)
)
Args:
external_id: External ID of the timeseries to create
datapoints: List of datapoints that triggered the creation

Returns:
A TimeSeries object
"""
metric_name = external_id[len(self.external_id_prefix) :]

ensure_time_series(self.cdf_client, time_series)
metric_description = ""
for metric in REGISTRY.collect():
if isinstance(metric, Metric) and metric.name == metric_name:
metric_description = metric.documentation
break

is_string = (
isinstance(datapoints[0].get("value"), str)
if isinstance(datapoints[0], dict)
else isinstance(datapoints[0][1], str)
)

return TimeSeries(
external_id=external_id,
name=metric_name,
legacy_name=external_id,
description=metric_description,
asset_id=self._asset_id,
data_set_id=self._data_set_id,
is_string=is_string,
)

def _push_to_server(self) -> None:
"""
Create datapoints an push them to their respective time series.
Create datapoints and push them to their respective time series using TimeSeriesUploadQueue.

The queue will automatically create missing timeseries for late-registered metrics.
"""
timestamp = int(arrow.get().float_timestamp * 1000)

datapoints: list[dict[str, str | int | list[Any] | Datapoints | DatapointsArray | NodeId]] = []

for metric in REGISTRY.collect():
if isinstance(metric, Metric) and metric.type in ["gauge", "counter"]:
if len(metric.samples) == 0:
continue

external_id = self.external_id_prefix + metric.name
datapoints.append({"externalId": external_id, "datapoints": [(timestamp, metric.samples[0].value)]})

self.cdf_client.time_series.data.insert_multiple(datapoints)
self.upload_queue.add_to_upload_queue(
external_id=external_id, datapoints=[(timestamp, metric.samples[0].value)]
)

self.upload_queue.upload()
self.logger.debug("Pushed metrics to CDF tenant '%s'", self._cdf_project)

def stop(self) -> None:
"""
Stop the push loop and ensure all metrics are uploaded.
"""
self._push_to_server()
self.upload_queue.stop()
self.cancellation_token.cancel()
226 changes: 226 additions & 0 deletions tests/tests_integration/test_metrics_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""
Integration tests for CognitePusher with late-registered metrics.

This test verifies that CognitePusher correctly handles metrics that are registered
after initialization (like python_gc_* and python_info metrics from Prometheus).
"""

import random
import time
from collections.abc import Generator

import pytest
from prometheus_client import Counter, Gauge
from prometheus_client.core import REGISTRY

from cognite.client import CogniteClient
from cognite.extractorutils.metrics import CognitePusher


@pytest.fixture
def test_prefix() -> str:
"""Generate a unique prefix for this test run to avoid conflicts."""
test_id = random.randint(0, 2**31)
return f"integration_test_{test_id}_"


@pytest.fixture
def cognite_pusher_test(
set_client: CogniteClient, test_prefix: str
) -> Generator[tuple[CogniteClient, str, list[str]], None, None]:
"""
Fixture that sets up and tears down a CognitePusher test.

Yields:
Tuple of (client, test_prefix, list of created timeseries external_ids)
"""
client = set_client
created_external_ids: list[str] = []

yield client, test_prefix, created_external_ids

if created_external_ids:
try:
client.time_series.delete(external_id=created_external_ids, ignore_unknown_ids=True)
except Exception as e:
print(f"Warning: Failed to cleanup timeseries: {e}")


def test_cognite_pusher_with_late_registered_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None:
"""
Test that CognitePusher handles both early and late-registered metrics.

This simulates the real-world scenario where:
1. Some metrics (like extractor-specific metrics) are registered before CognitePusher init
2. Other metrics (like python_gc_*, python_info) are registered after initialization
3. All metrics should be uploaded correctly during push
"""
client, test_prefix, created_external_ids = cognite_pusher_test

early_gauge_name = f"early_gauge_{random.randint(0, 2**31)}"
early_gauge = Gauge(early_gauge_name, "A metric registered before CognitePusher init")
early_gauge.set(42.0)

early_external_id = test_prefix + early_gauge_name
created_external_ids.append(early_external_id)

pusher = CognitePusher(
cdf_client=client,
external_id_prefix=test_prefix,
push_interval=60,
)

late_gauge_name = f"late_gauge_{random.randint(0, 2**31)}"
late_gauge = Gauge(late_gauge_name, "A metric registered AFTER CognitePusher init (like python_gc)")
late_gauge.set(99.0)

late_counter_name = f"late_counter_{random.randint(0, 2**31)}"
late_counter = Counter(late_counter_name, "A counter registered AFTER CognitePusher init (like python_info)")
late_counter.inc(5)

late_gauge_external_id = test_prefix + late_gauge_name
late_counter_external_id = test_prefix + late_counter_name
created_external_ids.append(late_gauge_external_id)
created_external_ids.append(late_counter_external_id)

# This should create timeseries for ALL metrics (early + late)
pusher._push_to_server()

time.sleep(2)

early_ts = client.time_series.retrieve(external_id=early_external_id)
assert early_ts is not None, f"Early metric timeseries {early_external_id} was not created"
assert early_ts.name == early_gauge_name
assert early_ts.description == "A metric registered before CognitePusher init"

late_gauge_ts = client.time_series.retrieve(external_id=late_gauge_external_id)
assert late_gauge_ts is not None, f"Late gauge timeseries {late_gauge_external_id} was not created"
assert late_gauge_ts.name == late_gauge_name
assert late_gauge_ts.description == "A metric registered AFTER CognitePusher init (like python_gc)"

late_counter_ts = client.time_series.retrieve(external_id=late_counter_external_id)
assert late_counter_ts is not None, f"Late counter timeseries {late_counter_external_id} was not created"
assert late_counter_ts.name == late_counter_name

early_datapoints = client.time_series.data.retrieve(
external_id=early_external_id, start="1h-ago", end="now", limit=10
)
assert len(early_datapoints) > 0, "No datapoints for early metric"
assert early_datapoints.value[0] == pytest.approx(42.0)

late_gauge_datapoints = client.time_series.data.retrieve(
external_id=late_gauge_external_id, start="1h-ago", end="now", limit=10
)
assert len(late_gauge_datapoints) > 0, "No datapoints for late gauge metric"
assert late_gauge_datapoints.value[0] == pytest.approx(99.0)

late_counter_datapoints = client.time_series.data.retrieve(
external_id=late_counter_external_id, start="1h-ago", end="now", limit=10
)
assert len(late_counter_datapoints) > 0, "No datapoints for late counter metric"
assert late_counter_datapoints.value[0] == pytest.approx(5.0)

pusher.stop()

REGISTRY.unregister(early_gauge)
REGISTRY.unregister(late_gauge)
REGISTRY.unregister(late_counter)


def test_cognite_pusher_stop_uploads_late_metrics(cognite_pusher_test: tuple[CogniteClient, str, list[str]]) -> None:
"""
Test that stop() correctly uploads all metrics including late-registered ones.

This is the scenario where:
1. CognitePusher is initialized
2. Metrics are registered after
3. stop() is called during shutdown
4. All metrics (including late ones) should be uploaded
"""
client, test_prefix, created_external_ids = cognite_pusher_test

pusher = CognitePusher(
cdf_client=client,
external_id_prefix=test_prefix,
push_interval=60,
)

late_metric_name = f"shutdown_metric_{random.randint(0, 2**31)}"
late_metric = Gauge(late_metric_name, "A metric registered after init, uploaded during shutdown")
late_metric.set(123.0)

late_external_id = test_prefix + late_metric_name
created_external_ids.append(late_external_id)

pusher.stop()

time.sleep(2)

late_ts = client.time_series.retrieve(external_id=late_external_id)
assert late_ts is not None, f"Late metric {late_external_id} was not created during shutdown"

late_datapoints = client.time_series.data.retrieve(
external_id=late_external_id, start="1h-ago", end="now", limit=10
)
assert len(late_datapoints) > 0, "No datapoints for late metric after shutdown"
assert late_datapoints.value[0] == pytest.approx(123.0)

REGISTRY.unregister(late_metric)


def test_cognite_pusher_multiple_pushes_with_late_metrics(
cognite_pusher_test: tuple[CogniteClient, str, list[str]],
) -> None:
"""
Test that multiple pushes work correctly with late-registered metrics.

Scenario:
1. Push with some metrics
2. Register new metrics
3. Push again - new metrics should be created and uploaded
"""
client, test_prefix, created_external_ids = cognite_pusher_test

initial_metric_name = f"initial_{random.randint(0, 2**31)}"
initial_metric = Gauge(initial_metric_name, "Initial metric")
initial_metric.set(10.0)

initial_external_id = test_prefix + initial_metric_name
created_external_ids.append(initial_external_id)

pusher = CognitePusher(
cdf_client=client,
external_id_prefix=test_prefix,
push_interval=60,
)

pusher._push_to_server()
time.sleep(1)

initial_ts = client.time_series.retrieve(external_id=initial_external_id)
assert initial_ts is not None

late_metric_name = f"later_{random.randint(0, 2**31)}"
late_metric = Gauge(late_metric_name, "Late metric added between pushes")
late_metric.set(20.0)

late_external_id = test_prefix + late_metric_name
created_external_ids.append(late_external_id)

initial_metric.set(11.0)
pusher._push_to_server()
time.sleep(2)

late_ts = client.time_series.retrieve(external_id=late_external_id)
assert late_ts is not None, "Late metric was not created on second push"

late_datapoints = client.time_series.data.retrieve(
external_id=late_external_id, start="1h-ago", end="now", limit=10
)
assert len(late_datapoints) > 0
assert late_datapoints.value[0] == pytest.approx(20.0)

pusher.stop()

REGISTRY.unregister(initial_metric)
REGISTRY.unregister(late_metric)
Loading
Loading