11import pytest
22
33from cognite .client .data_classes import filters
4- from cognite .client .data_classes .datapoints_subscriptions import DatapointSubscription , DataPointSubscriptionWrite
4+ from cognite .client .data_classes .datapoints_subscriptions import (
5+ DatapointSubscription ,
6+ DataPointSubscriptionWrite ,
7+ TimeSeriesID ,
8+ TimeSeriesIDList ,
9+ )
510
611
712class TestDataPointSubscription :
@@ -27,3 +32,67 @@ def test_handles_null_timeseries_count(self) -> None:
2732 }
2833 )
2934 assert sub .time_series_count is None
35+
36+
37+ class TestTimeSeriesID :
38+ def test_load_with_all_fields (self ) -> None :
39+ ts_id = TimeSeriesID ._load ({"id" : 123 , "externalId" : "my_ts" })
40+ assert ts_id .id == 123
41+ assert ts_id .external_id == "my_ts"
42+ assert ts_id .instance_id is None
43+ assert ts_id .is_resolved is True
44+
45+ def test_load_with_missing_id_broken_reference (self ) -> None :
46+ """Test that TimeSeriesID can be loaded when 'id' is missing (broken reference scenario).
47+
48+ This happens when a time series's external_id is changed or the time series is deleted,
49+ but the subscription still references the old external_id.
50+ """
51+ ts_id = TimeSeriesID ._load ({"externalId" : "deleted_or_renamed_ts" })
52+ assert ts_id .id is None
53+ assert ts_id .external_id == "deleted_or_renamed_ts"
54+ assert ts_id .instance_id is None
55+ assert ts_id .is_resolved is False
56+
57+ def test_load_with_instance_id_only (self ) -> None :
58+ """Test loading a TimeSeriesID with only an instance_id (broken reference)."""
59+ ts_id = TimeSeriesID ._load ({"instanceId" : {"space" : "my_space" , "externalId" : "my_node" }})
60+ assert ts_id .id is None
61+ assert ts_id .external_id is None
62+ assert ts_id .instance_id is not None
63+ assert ts_id .instance_id .space == "my_space"
64+ assert ts_id .instance_id .external_id == "my_node"
65+ assert ts_id .is_resolved is False
66+
67+ def test_repr_with_id (self ) -> None :
68+ ts_id = TimeSeriesID (id = 123 , external_id = "my_ts" )
69+ assert repr (ts_id ) == "TimeSeriesID(id=123, external_id=my_ts)"
70+
71+ def test_repr_without_id (self ) -> None :
72+ ts_id = TimeSeriesID (external_id = "broken_ref" )
73+ assert repr (ts_id ) == "TimeSeriesID(external_id=broken_ref)"
74+
75+ def test_dump_with_id (self ) -> None :
76+ ts_id = TimeSeriesID (id = 123 , external_id = "my_ts" )
77+ dumped = ts_id .dump ()
78+ assert dumped == {"id" : 123 , "externalId" : "my_ts" }
79+
80+ def test_dump_without_id (self ) -> None :
81+ """Test that dump excludes 'id' when it's None."""
82+ ts_id = TimeSeriesID (external_id = "broken_ref" )
83+ dumped = ts_id .dump ()
84+ assert dumped == {"externalId" : "broken_ref" }
85+ assert "id" not in dumped
86+
87+ def test_time_series_id_list_with_broken_references (self ) -> None :
88+ """Test that TimeSeriesIDList can handle a mix of resolved and broken references."""
89+ items = [
90+ {"id" : 123 , "externalId" : "ts_1" },
91+ {"externalId" : "broken_ref" }, # No id - broken reference
92+ {"id" : 456 , "externalId" : "ts_2" },
93+ ]
94+ ts_list = TimeSeriesIDList ._load (items )
95+ assert len (ts_list ) == 3
96+ assert ts_list [0 ].is_resolved is True
97+ assert ts_list [1 ].is_resolved is False
98+ assert ts_list [2 ].is_resolved is True
0 commit comments