1010
1111from cognite .client import CogniteClient
1212from cognite .client .exceptions import CogniteNotFoundError
13- from cognite .extractorutils .metrics import PrometheusPusher
13+ from cognite .extractorutils .metrics import CognitePusher , PrometheusPusher
1414from cognite .extractorutils .statestore .watermark import LocalStateStore , RawStateStore
1515from cognite .extractorutils .unstable .configuration .loaders import ConfigFormat , load_io
1616from cognite .extractorutils .unstable .configuration .models import (
2424 RawStateStoreConfig ,
2525 StateStoreConfig ,
2626 TimeIntervalConfig ,
27+ _CogniteMetricsConfig ,
2728 _PromServerConfig ,
2829 _PushGatewayConfig ,
2930)
@@ -245,31 +246,52 @@ def test_raw_state_store_integration(
245246 assert new_extractor .state_store .get_state ("my-test-id" ) == (1 , 5 )
246247
247248
248- def test_extractor_with_metrics (
249- connection_config : ConnectionConfig ,
250- override_level : str | None = None ,
251- ) -> None :
252- app_config = TestConfig (
253- parameter_one = 1 ,
254- parameter_two = "a" ,
255- )
256- metrics_config = MetricsConfig (
257- server = _PromServerConfig (
258- host = "localhost" ,
259- port = 9090 ,
260- ),
261- cognite = None ,
262- push_gateways = [
263- _PushGatewayConfig (
264- host = "localhost" ,
265- job_name = "test-job" ,
266- username = None ,
267- password = None ,
268- clear_after = None ,
269- push_interval = TimeIntervalConfig ("30s" ),
270- )
271- ],
272- )
249+ @pytest .mark .parametrize ("metrics_type" , ["prometheus" , "cognite" ])
250+ def test_extractor_with_metrics_pushers (connection_config : ConnectionConfig , metrics_type : str ) -> None :
251+ override_level = "INFO"
252+ app_config = TestConfig (parameter_one = 1 , parameter_two = "a" )
253+ call_count = {"count" : 0 }
254+
255+ if metrics_type == "prometheus" :
256+ metrics_config = MetricsConfig (
257+ server = _PromServerConfig (host = "localhost" , port = 9090 ),
258+ cognite = None ,
259+ push_gateways = [
260+ _PushGatewayConfig (
261+ host = "localhost" ,
262+ job_name = "test-job" ,
263+ username = None ,
264+ password = None ,
265+ clear_after = None ,
266+ push_interval = TimeIntervalConfig ("30s" ),
267+ )
268+ ],
269+ )
270+ pusher_cls = PrometheusPusher
271+
272+ def counting_push (self : PrometheusPusher ) -> None :
273+ call_count ["count" ] += 1
274+ return original_push (self )
275+
276+ original_push = pusher_cls ._push_to_server
277+ else :
278+ metrics_config = MetricsConfig (
279+ server = None ,
280+ cognite = _CogniteMetricsConfig (
281+ external_id_prefix = "extractor_test" ,
282+ asset_name = "Extractor Test Metrics" ,
283+ asset_external_id = "extractor_testcognite_assets" ,
284+ data_set = None ,
285+ ),
286+ push_gateways = None ,
287+ )
288+ pusher_cls = CognitePusher
289+
290+ def counting_push (self : CognitePusher ) -> None :
291+ call_count ["count" ] += 1
292+ return None
293+
294+ original_push = pusher_cls ._push_to_server
273295
274296 full_config = FullConfig (
275297 connection_config = connection_config ,
@@ -282,27 +304,16 @@ def test_extractor_with_metrics(
282304 extractor = TestExtractor (full_config , worker , metrics = TestMetrics )
283305 assert isinstance (extractor ._metrics , TestMetrics ) or extractor ._metrics == TestMetrics
284306
285- # The metrics instance should be a singleton
286- another_extractor = TestExtractor (full_config , worker , metrics = TestMetrics )
287- assert another_extractor ._metrics is extractor ._metrics
288- assert isinstance (another_extractor ._metrics , TestMetrics ) or another_extractor ._metrics == TestMetrics
289-
290- call_count = {"count" : 0 }
291- original_push = PrometheusPusher ._push_to_server
292-
293- def counting_push (self : "PrometheusPusher" ) -> None :
294- call_count ["count" ] += 1
295- return original_push (self )
296-
297- PrometheusPusher ._push_to_server = counting_push
298-
299- try :
300- with extractor :
301- for pusher in extractor .metrics_push_manager .pushers :
302- assert pusher .thread is not None
303- assert pusher .thread .is_alive ()
304- finally :
305- PrometheusPusher ._push_to_server = original_push
307+ with contextlib .ExitStack () as stack :
308+ stack .enter_context (contextlib .suppress (Exception ))
309+ pusher_cls ._push_to_server = counting_push
310+ try :
311+ with extractor :
312+ for pusher in extractor .metrics_push_manager .pushers :
313+ assert pusher .thread is not None
314+ assert pusher .thread .is_alive ()
315+ finally :
316+ pusher_cls ._push_to_server = original_push
306317 assert call_count ["count" ] > 0
307318
308319
0 commit comments