1111from conftest import ETestType , ParamTest
1212
1313from cognite .client import CogniteClient
14+ from cognite .client .data_classes import StatusCode
1415from cognite .client .data_classes .data_modeling import NodeApply , NodeId
1516from cognite .client .data_classes .data_modeling .extractor_extensions .v1 import CogniteExtractorTimeSeriesApply
16- from cognite .client .exceptions import CogniteNotFoundError
17+ from cognite .client .exceptions import CogniteAPIError , CogniteNotFoundError
1718from cognite .extractorutils .uploader .time_series import CDMTimeSeriesUploadQueue
1819
1920MIN_DATAPOINT_TIMESTAMP = - 2208988800000
@@ -190,8 +191,6 @@ def test_cdm_queue_discards_invalid_values(set_upload_test: tuple[CogniteClient,
190191 queue = CDMTimeSeriesUploadQueue (cdf_client = client , create_missing = True )
191192 queue .start ()
192193
193- ts_apply_numeric = _apply_node (params .space , ext_id_1 , "numeric" )
194- ts_apply_string = _apply_node (params .space , ext_id_2 , "string" )
195194 now = int (datetime .now (tz = timezone .utc ).timestamp () * 1_000 )
196195 good = (now , 123 )
197196 bad_time = (MIN_DATAPOINT_TIMESTAMP - 1 , 9 )
@@ -216,3 +215,112 @@ def test_cdm_queue_discards_invalid_values(set_upload_test: tuple[CogniteClient,
216215 assert [str (v ) for v in recv_points_2 .value ] == [valid_temp_str_dp [1 ]]
217216 assert too_long_str [1 ] not in recv_points_2 .value
218217 queue .stop ()
218+
219+
220+ @pytest .mark .parametrize (
221+ "ts_type, invalid_dps_function, expected_error_msg" ,
222+ [
223+ ("numeric" , _rand_string_points , "Expected numeric value" ),
224+ ("string" , _rand_numeric_points , "Expected string value" ),
225+ ],
226+ ids = ["string_into_numeric_fails" , "numeric_into_string_fails" ],
227+ )
228+ def test_cdm_queue_fails_on_data_type_mismatch (
229+ set_upload_test : tuple [CogniteClient , ParamTest ],
230+ ts_type : Literal ["numeric" , "string" ],
231+ invalid_dps_function : callable ,
232+ expected_error_msg : str ,
233+ ) -> None :
234+ """
235+ Tests that the uploader fails with a specific error when the datapoint type
236+ does not match the time series type. This test is parametrized to cover:
237+ 1. Ingesting string datapoints into a numeric time series.
238+ 2. Ingesting numeric datapoints into a string time series.
239+ """
240+ client , params = set_upload_test
241+ ext_id = params .external_ids [0 ]
242+ instance_id = NodeId (space = params .space , external_id = ext_id )
243+
244+ node = _apply_node (params .space , ext_id , ts_type )
245+ client .data_modeling .instances .apply (nodes = node )
246+ time .sleep (2 )
247+
248+ queue = CDMTimeSeriesUploadQueue (cdf_client = client , create_missing = False )
249+ now = int (datetime .now (tz = timezone .utc ).timestamp () * 1_000 )
250+ invalid_dps = invalid_dps_function (n = 5 , start_ms = now )
251+ queue .add_to_upload_queue (instance_id = instance_id , datapoints = invalid_dps )
252+
253+ with pytest .raises (CogniteAPIError ) as excinfo :
254+ queue .upload ()
255+
256+ assert excinfo .value .code == 400
257+ assert expected_error_msg in excinfo .value .message
258+
259+ retrieved_dps = client .time_series .data .retrieve (instance_id = instance_id )
260+ assert len (retrieved_dps ) == 0
261+
262+
263+ def test_cdm_queue_with_status_codes (set_upload_test : tuple [CogniteClient , ParamTest ]) -> None :
264+ """
265+ Tests ingesting datapoints that include status codes into a numeric CDM time series.
266+ """
267+ client , params = set_upload_test
268+
269+ queue = CDMTimeSeriesUploadQueue (cdf_client = client , create_missing = True )
270+ queue .start ()
271+
272+ start = int (datetime .now (tz = timezone .utc ).timestamp () * 1000 ) - 5_000
273+
274+ statuses = [
275+ StatusCode .Good ,
276+ StatusCode .Uncertain ,
277+ StatusCode .Bad ,
278+ 3145728 , # GoodClamped
279+ ]
280+
281+ points1 = [(start + i * 42 , random .random (), random .choice (statuses )) for i in range (30 )]
282+ queue .add_to_upload_queue (
283+ instance_id = NodeId (space = params .space , external_id = params .external_ids [0 ]), datapoints = points1
284+ )
285+
286+ points2 = [(start + i * 24 , random .random (), random .choice (statuses )) for i in range (50 )]
287+ queue .add_to_upload_queue (
288+ instance_id = NodeId (space = params .space , external_id = params .external_ids [1 ]), datapoints = points2
289+ )
290+
291+ queue .upload ()
292+ time .sleep (5 )
293+
294+ recv_points1 = client .time_series .data .retrieve (
295+ instance_id = NodeId (space = params .space , external_id = params .external_ids [0 ]),
296+ start = start - 100 ,
297+ end = "now" ,
298+ limit = None ,
299+ include_status = True ,
300+ treat_uncertain_as_bad = False ,
301+ ignore_bad_datapoints = False ,
302+ )
303+
304+ recv_points2 = client .time_series .data .retrieve (
305+ instance_id = NodeId (space = params .space , external_id = params .external_ids [1 ]),
306+ start = start - 100 ,
307+ end = "now" ,
308+ limit = None ,
309+ include_status = True ,
310+ treat_uncertain_as_bad = False ,
311+ ignore_bad_datapoints = False ,
312+ )
313+ queue .stop ()
314+
315+ assert len (recv_points1 ) == len (points1 )
316+ assert len (recv_points2 ) == len (points2 )
317+
318+ for point , recv_point in zip (points1 , recv_points1 ): # noqa: B905
319+ assert point [0 ] == recv_point .timestamp
320+ assert point [1 ] == recv_point .value
321+ assert point [2 ] == recv_point .status_code
322+
323+ for point , recv_point in zip (points2 , recv_points2 ): # noqa: B905
324+ assert point [0 ] == recv_point .timestamp
325+ assert point [1 ] == recv_point .value
326+ assert point [2 ] == recv_point .status_code
0 commit comments