Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions tests/tests_integration/test_metrics_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,47 @@
from prometheus_client.core import REGISTRY

from cognite.client import CogniteClient
from cognite.client.exceptions import CogniteNotFoundError
from cognite.extractorutils.metrics import CognitePusher

logger = logging.getLogger(__name__)


def poll_for_condition(condition: Callable[[], bool], timeout: int = 10, interval: float = 0.5) -> None:
"""
Poll a condition function until it returns True or timeout is reached.

Args:
condition: A callable that returns True when the condition is met
timeout: Maximum time to wait in seconds
interval: Time to wait between checks in seconds
"""
start_time = time.time()
while time.time() - start_time < timeout:
if condition():
return
time.sleep(interval)
pytest.fail(f"Condition not met within {timeout} seconds.")


def timeseries_exist(client: CogniteClient, external_ids: list[str]) -> bool:
"""
Check if all specified timeseries exist in CDF.

Args:
client: CogniteClient instance
external_ids: List of external IDs to check

Returns:
True if all timeseries exist, False otherwise
"""
try:
client.time_series.retrieve_multiple(external_ids=external_ids, ignore_unknown_ids=False)
return True
except CogniteNotFoundError:
return False


def assert_timeseries_exists(
client: CogniteClient,
external_id: str,
Expand Down Expand Up @@ -164,7 +200,9 @@ def test_cognite_pusher_with_late_registered_metrics(
# This should create timeseries for ALL metrics (early + late)
pusher._push_to_server()

time.sleep(2)
poll_for_condition(
lambda: timeseries_exist(client, [early_external_id, late_gauge_external_id, late_counter_external_id])
)

assert_timeseries_exists(
client, early_external_id, early_gauge_name, "A metric registered before CognitePusher init"
Expand Down Expand Up @@ -211,7 +249,7 @@ def test_cognite_pusher_stop_uploads_late_metrics(

pusher.stop()

time.sleep(2)
poll_for_condition(lambda: timeseries_exist(client, [late_external_id]))

assert_timeseries_exists(client, late_external_id)
assert_datapoint_value(client, late_external_id, 123.0)
Expand Down Expand Up @@ -245,7 +283,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics(
)

pusher._push_to_server()
time.sleep(1)
poll_for_condition(lambda: timeseries_exist(client, [initial_external_id]))

assert_timeseries_exists(client, initial_external_id)

Expand All @@ -258,7 +296,7 @@ def test_cognite_pusher_multiple_pushes_with_late_metrics(

initial_metric.set(11.0)
pusher._push_to_server()
time.sleep(2)
poll_for_condition(lambda: timeseries_exist(client, [late_external_id]))

assert_timeseries_exists(client, late_external_id)
assert_datapoint_value(client, late_external_id, 20.0)
Expand Down
36 changes: 35 additions & 1 deletion tests/tests_unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from cognite.client import CogniteClient
from cognite.client.data_classes import Asset
from cognite.client.exceptions import CogniteDuplicatedError
from cognite.client.exceptions import CogniteDuplicatedError, CogniteNotFoundError
from cognite.extractorutils import metrics
from cognite.extractorutils.metrics import CognitePusher, safe_get

Expand Down Expand Up @@ -198,6 +198,40 @@ def test_push(MockCogniteClient: Mock) -> None:
assert ts["datapoints"][0][1] == pytest.approx(5)


@patch("cognite.client.CogniteClient")
def test_push_creates_missing_timeseries(MockCogniteClient: Mock) -> None:
"""Test that push logic creates missing time series when enabled."""
init_gauge()
client: CogniteClient = MockCogniteClient()

# Create a mock CogniteNotFoundError with not_found and failed attributes
not_found_error = CogniteNotFoundError([{"externalId": "pre_gauge"}])
not_found_error.not_found = [{"externalId": "pre_gauge"}]
not_found_error.failed = []

# Simulate CogniteNotFoundError on first push, then success on retry
client.time_series.data.insert_multiple.side_effect = [
not_found_error,
None, # Success on retry
]

pusher = CognitePusher(client, "pre_", push_interval=1)

GaugeSetUp.gauge.set(5)
pusher._push_to_server()

# Assert that we tried to create the timeseries
client.time_series.create.assert_called_once()
created_ts_list = client.time_series.create.call_args[0][0]
assert len(created_ts_list) == 1
assert created_ts_list[0].external_id == "pre_gauge"
assert created_ts_list[0].name == "gauge"
assert created_ts_list[0].description == "Test gauge"

# Assert that insert_multiple was called twice (initial attempt + retry)
assert client.time_series.data.insert_multiple.call_count == 2


# MetricsUtils test
@pytest.fixture
def init_counter() -> None:
Expand Down
Loading