|
1 | 1 | use anyhow::anyhow; |
2 | 2 | use assert_matches::assert_matches; |
3 | | -use std::string::ToString; |
4 | | -use std::{collections::HashMap, env, net::SocketAddr, sync::Arc, time::Duration}; |
| 3 | +use std::{ |
| 4 | + collections::HashMap, |
| 5 | + env, |
| 6 | + net::SocketAddr, |
| 7 | + string::ToString, |
| 8 | + sync::{Arc, Mutex}, |
| 9 | + time::Duration, |
| 10 | +}; |
5 | 11 | use temporal_client::{ |
6 | 12 | WorkflowClientTrait, WorkflowOptions, WorkflowService, REQUEST_LATENCY_HISTOGRAM_NAME, |
7 | 13 | }; |
@@ -48,6 +54,7 @@ use temporal_sdk_core_test_utils::{ |
48 | 54 | PROMETHEUS_QUERY_API, |
49 | 55 | }; |
50 | 56 | use tokio::{join, sync::Barrier, task::AbortHandle}; |
| 57 | +use tracing_subscriber::fmt::MakeWriter; |
51 | 58 | use url::Url; |
52 | 59 |
|
53 | 60 | static ANY_PORT: &str = "127.0.0.1:0"; |
@@ -900,3 +907,69 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { |
900 | 907 | // Metric shouldn't show up at all, since it's zero the whole time. |
901 | 908 | assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); |
902 | 909 | } |
| 910 | + |
| 911 | +struct CapturingWriter { |
| 912 | + buf: Arc<Mutex<Vec<u8>>>, |
| 913 | +} |
| 914 | + |
| 915 | +impl MakeWriter<'_> for CapturingWriter { |
| 916 | + type Writer = CapturingHandle; |
| 917 | + |
| 918 | + fn make_writer(&self) -> Self::Writer { |
| 919 | + CapturingHandle(self.buf.clone()) |
| 920 | + } |
| 921 | +} |
| 922 | + |
| 923 | +struct CapturingHandle(Arc<Mutex<Vec<u8>>>); |
| 924 | + |
| 925 | +impl std::io::Write for CapturingHandle { |
| 926 | + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { |
| 927 | + let mut b = self.0.lock().unwrap(); |
| 928 | + b.extend_from_slice(buf); |
| 929 | + Ok(buf.len()) |
| 930 | + } |
| 931 | + fn flush(&mut self) -> std::io::Result<()> { |
| 932 | + Ok(()) |
| 933 | + } |
| 934 | +} |
| 935 | + |
| 936 | +#[tokio::test] |
| 937 | +async fn otel_errors_logged_as_errors() { |
| 938 | + // Set up tracing subscriber to capture ERROR logs |
| 939 | + let logs = Arc::new(Mutex::new(Vec::new())); |
| 940 | + let writer = CapturingWriter { buf: logs.clone() }; |
| 941 | + let subscriber = tracing_subscriber::fmt().with_writer(writer).finish(); |
| 942 | + let _guard = tracing::subscriber::set_default(subscriber); |
| 943 | + |
| 944 | + let opts = OtelCollectorOptionsBuilder::default() |
| 945 | + .url("https://localhostt:9995/v1/metrics".parse().unwrap()) // Invalid endpoint |
| 946 | + .build() |
| 947 | + .unwrap(); |
| 948 | + let exporter = build_otlp_metric_exporter(opts).unwrap(); |
| 949 | + |
| 950 | + let telemopts = TelemetryOptionsBuilder::default() |
| 951 | + .metrics(Arc::new(exporter) as Arc<dyn CoreMeter>) |
| 952 | + .build() |
| 953 | + .unwrap(); |
| 954 | + |
| 955 | + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); |
| 956 | + let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); |
| 957 | + let _worker = starter.get_worker().await; |
| 958 | + |
| 959 | + // Wait to allow exporter to attempt sending metrics and fail. |
| 960 | + tokio::time::sleep(Duration::from_secs(2)).await; |
| 961 | + |
| 962 | + let logs = logs.lock().unwrap(); |
| 963 | + let log_str = String::from_utf8_lossy(&logs); |
| 964 | + |
| 965 | + assert!( |
| 966 | + log_str.contains("ERROR"), |
| 967 | + "Expected ERROR log not found in logs: {}", |
| 968 | + log_str |
| 969 | + ); |
| 970 | + assert!( |
| 971 | + log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), |
| 972 | + "Expected an OTel exporter error message in logs: {}", |
| 973 | + log_str |
| 974 | + ); |
| 975 | +} |
0 commit comments