11use anyhow:: anyhow;
22use assert_matches:: assert_matches;
3+ use std:: string:: ToString ;
34use std:: { collections:: HashMap , env, net:: SocketAddr , sync:: Arc , time:: Duration } ;
45use temporal_client:: {
56 WorkflowClientTrait , WorkflowOptions , WorkflowService , REQUEST_LATENCY_HISTOGRAM_NAME ,
@@ -15,8 +16,9 @@ use temporal_sdk_core::{
1516use temporal_sdk_core_api:: {
1617 telemetry:: {
1718 metrics:: { CoreMeter , MetricAttributes , MetricParameters } ,
18- HistogramBucketOverrides , OtelCollectorOptionsBuilder , PrometheusExporterOptions ,
19- PrometheusExporterOptionsBuilder , TelemetryOptions , TelemetryOptionsBuilder ,
19+ HistogramBucketOverrides , OtelCollectorOptionsBuilder , OtlpProtocol ,
20+ PrometheusExporterOptions , PrometheusExporterOptionsBuilder , TelemetryOptions ,
21+ TelemetryOptionsBuilder ,
2022 } ,
2123 worker:: WorkerConfigBuilder ,
2224 Worker ,
@@ -43,6 +45,7 @@ use temporal_sdk_core_protos::{
4345} ;
4446use temporal_sdk_core_test_utils:: {
4547 get_integ_server_options, get_integ_telem_options, CoreWfStarter , NAMESPACE , OTEL_URL_ENV_VAR ,
48+ PROMETHEUS_QUERY_API ,
4649} ;
4750use tokio:: { join, sync:: Barrier , task:: AbortHandle } ;
4851use url:: Url ;
@@ -651,6 +654,88 @@ async fn request_fail_codes_otel() {
651654 }
652655}
653656
657+ // Tests that rely on Prometheus running in a docker container need to start
658+ // with `docker_` and set the `DOCKER_PROMETHEUS_RUNNING` env variable to run
659+ #[ rstest:: rstest]
660+ #[ tokio:: test]
661+ async fn docker_metrics_with_prometheus (
662+ #[ values(
663+ ( "http://localhost:4318/v1/metrics" , OtlpProtocol :: Http ) ,
664+ ( "http://localhost:4317" , OtlpProtocol :: Grpc )
665+ ) ]
666+ otel_collector : ( & str , OtlpProtocol ) ,
667+ ) {
668+ if std:: env:: var ( "DOCKER_PROMETHEUS_RUNNING" ) . is_err ( ) {
669+ return ;
670+ }
671+ let ( otel_collector_addr, otel_protocol) = otel_collector;
672+ let test_uid = format ! (
673+ "test_{}_" ,
674+ uuid:: Uuid :: new_v4( ) . to_string( ) . replace( "-" , "" )
675+ ) ;
676+
677+ // Configure the OTLP exporter with HTTP
678+ let opts = OtelCollectorOptionsBuilder :: default ( )
679+ . url ( otel_collector_addr. parse ( ) . unwrap ( ) )
680+ . protocol ( otel_protocol)
681+ . global_tags ( HashMap :: from ( [ ( "test_id" . to_string ( ) , test_uid. clone ( ) ) ] ) )
682+ . build ( )
683+ . unwrap ( ) ;
684+ let exporter = Arc :: new ( build_otlp_metric_exporter ( opts) . unwrap ( ) ) ;
685+ let telemopts = TelemetryOptionsBuilder :: default ( )
686+ . metrics ( exporter as Arc < dyn CoreMeter > )
687+ . metric_prefix ( test_uid. clone ( ) )
688+ . build ( )
689+ . unwrap ( ) ;
690+ let rt = CoreRuntime :: new_assume_tokio ( telemopts) . unwrap ( ) ;
691+ let test_name = "docker_metrics_with_prometheus" ;
692+ let mut starter = CoreWfStarter :: new_with_runtime ( test_name, rt) ;
693+ let worker = starter. get_worker ( ) . await ;
694+ starter. start_wf ( ) . await ;
695+
696+ // Immediately finish the workflow
697+ let task = worker. poll_workflow_activation ( ) . await . unwrap ( ) ;
698+ worker
699+ . complete_workflow_activation ( WorkflowActivationCompletion :: from_cmd (
700+ task. run_id ,
701+ CompleteWorkflowExecution { result : None } . into ( ) ,
702+ ) )
703+ . await
704+ . unwrap ( ) ;
705+
706+ let client = starter. get_client ( ) . await ;
707+ client. list_namespaces ( ) . await . unwrap ( ) ;
708+
709+ // Give Prometheus time to scrape metrics
710+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 2 ) ) . await ;
711+
712+ // Query Prometheus API for metrics
713+ let client = reqwest:: Client :: new ( ) ;
714+ let query = format ! ( "temporal_sdk_{}num_pollers" , test_uid. clone( ) ) ;
715+ let response = client
716+ . get ( PROMETHEUS_QUERY_API )
717+ . query ( & [ ( "query" , query) ] )
718+ . send ( )
719+ . await
720+ . unwrap ( )
721+ . json :: < serde_json:: Value > ( )
722+ . await
723+ . unwrap ( ) ;
724+
725+ // Validate the Prometheus response
726+ if let Some ( data) = response[ "data" ] [ "result" ] . as_array ( ) {
727+ assert ! ( !data. is_empty( ) , "No metrics found for query: {test_uid}" ) ;
728+ assert_eq ! ( data[ 0 ] [ "metric" ] [ "exported_job" ] , "temporal-core-sdk" ) ;
729+ assert_eq ! ( data[ 0 ] [ "metric" ] [ "job" ] , "otel-collector" ) ;
730+ assert ! ( data[ 0 ] [ "metric" ] [ "task_queue" ]
731+ . as_str( )
732+ . unwrap( )
733+ . starts_with( test_name) ) ;
734+ } else {
735+ panic ! ( "Invalid Prometheus response: {:?}" , response) ;
736+ }
737+ }
738+
654739#[ tokio:: test]
655740async fn activity_metrics ( ) {
656741 let ( telemopts, addr, _aborter) = prom_metrics ( None ) ;
0 commit comments