Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions cognite/client/data_classes/datapoints_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,34 +263,46 @@ class TimeSeriesID(CogniteResource):
A TimeSeries Identifier to uniquely identify a time series.

Args:
id (int): A server-generated ID for the object.
id (int | None): A server-generated ID for the object. May be None if the time series
reference is broken (e.g., the time series was deleted or its external_id was changed).
external_id (ExternalId | None): The external ID provided by the client. Must be unique for the resource type.
instance_id (NodeId | None): The ID of an instance in Cognite Data Models.
"""

def __init__(self, id: int, external_id: ExternalId | None = None, instance_id: NodeId | None = None) -> None:
def __init__(
self, id: int | None = None, external_id: ExternalId | None = None, instance_id: NodeId | None = None
) -> None:
self.id = id
self.external_id = external_id
self.instance_id = instance_id

@property
def is_resolved(self) -> bool:
"""Returns True if this reference points to an existing time series (i.e., has an id)."""
return self.id is not None

def __repr__(self) -> str:
identifier = f"id={self.id}"
parts = []
if self.id is not None:
parts.append(f"id={self.id}")
if self.external_id is not None:
identifier += f", external_id={self.external_id}"
elif self.instance_id is not None:
identifier += f", instance_id={self.instance_id!r}"
return f"TimeSeriesID({identifier})"
parts.append(f"external_id={self.external_id}")
if self.instance_id is not None:
parts.append(f"instance_id={self.instance_id!r}")
return f"TimeSeriesID({', '.join(parts)})"

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> TimeSeriesID:
return cls(
id=resource["id"],
id=resource.get("id"),
external_id=resource.get("externalId"),
instance_id=NodeId.load(resource["instanceId"]) if "instanceId" in resource else None,
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
resource: dict[str, Any] = {"id": self.id}
resource: dict[str, Any] = {}
if self.id is not None:
resource["id"] = self.id
if self.external_id is not None:
resource["externalId" if camel_case else "external_id"] = self.external_id
if self.instance_id is not None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import pytest

from cognite.client.data_classes import filters
from cognite.client.data_classes.datapoints_subscriptions import DatapointSubscription, DataPointSubscriptionWrite
from cognite.client.data_classes.datapoints_subscriptions import (
DatapointSubscription,
DataPointSubscriptionWrite,
TimeSeriesID,
TimeSeriesIDList,
)


class TestDataPointSubscription:
Expand All @@ -27,3 +32,67 @@ def test_handles_null_timeseries_count(self) -> None:
}
)
assert sub.time_series_count is None


class TestTimeSeriesID:
def test_load_with_all_fields(self) -> None:
ts_id = TimeSeriesID._load({"id": 123, "externalId": "my_ts"})
assert ts_id.id == 123
assert ts_id.external_id == "my_ts"
assert ts_id.instance_id is None
assert ts_id.is_resolved is True

def test_load_with_missing_id_broken_reference(self) -> None:
"""Test that TimeSeriesID can be loaded when 'id' is missing (broken reference scenario).

This happens when a time series's external_id is changed or the time series is deleted,
but the subscription still references the old external_id.
"""
ts_id = TimeSeriesID._load({"externalId": "deleted_or_renamed_ts"})
assert ts_id.id is None
assert ts_id.external_id == "deleted_or_renamed_ts"
assert ts_id.instance_id is None
assert ts_id.is_resolved is False

def test_load_with_instance_id_only(self) -> None:
"""Test loading a TimeSeriesID with only an instance_id (broken reference)."""
ts_id = TimeSeriesID._load({"instanceId": {"space": "my_space", "externalId": "my_node"}})
assert ts_id.id is None
assert ts_id.external_id is None
assert ts_id.instance_id is not None
assert ts_id.instance_id.space == "my_space"
assert ts_id.instance_id.external_id == "my_node"
assert ts_id.is_resolved is False

def test_repr_with_id(self) -> None:
ts_id = TimeSeriesID(id=123, external_id="my_ts")
assert repr(ts_id) == "TimeSeriesID(id=123, external_id=my_ts)"

def test_repr_without_id(self) -> None:
ts_id = TimeSeriesID(external_id="broken_ref")
assert repr(ts_id) == "TimeSeriesID(external_id=broken_ref)"

def test_dump_with_id(self) -> None:
ts_id = TimeSeriesID(id=123, external_id="my_ts")
dumped = ts_id.dump()
assert dumped == {"id": 123, "externalId": "my_ts"}

def test_dump_without_id(self) -> None:
"""Test that dump excludes 'id' when it's None."""
ts_id = TimeSeriesID(external_id="broken_ref")
dumped = ts_id.dump()
assert dumped == {"externalId": "broken_ref"}
assert "id" not in dumped

def test_time_series_id_list_with_broken_references(self) -> None:
"""Test that TimeSeriesIDList can handle a mix of resolved and broken references."""
items = [
{"id": 123, "externalId": "ts_1"},
{"externalId": "broken_ref"}, # No id - broken reference
{"id": 456, "externalId": "ts_2"},
]
ts_list = TimeSeriesIDList._load(items)
assert len(ts_list) == 3
assert ts_list[0].is_resolved is True
assert ts_list[1].is_resolved is False
assert ts_list[2].is_resolved is True