@@ -12,6 +12,7 @@ use ::tracing::field;
12
12
use :: tracing:: info_span;
13
13
use :: tracing:: Span ;
14
14
use axum:: headers:: HeaderName ;
15
+ use dashmap:: DashMap ;
15
16
use futures:: future:: ready;
16
17
use futures:: future:: BoxFuture ;
17
18
use futures:: stream:: once;
@@ -22,6 +23,7 @@ use http::HeaderMap;
22
23
use http:: HeaderValue ;
23
24
use http:: StatusCode ;
24
25
use multimap:: MultiMap ;
26
+ use once_cell:: sync:: OnceCell ;
25
27
use opentelemetry:: propagation:: text_map_propagator:: FieldIter ;
26
28
use opentelemetry:: propagation:: Extractor ;
27
29
use opentelemetry:: propagation:: Injector ;
@@ -1366,19 +1368,56 @@ fn convert(
1366
1368
}
1367
1369
}
1368
1370
1371
+ #[ derive( Eq , PartialEq , Hash ) ]
1372
+ enum ErrorType {
1373
+ Trace ,
1374
+ Metric ,
1375
+ Other ,
1376
+ }
1377
+ static OTEL_ERROR_LAST_LOGGED : OnceCell < DashMap < ErrorType , Instant > > = OnceCell :: new ( ) ;
1378
+
1369
1379
fn handle_error < T : Into < opentelemetry:: global:: Error > > ( err : T ) {
1370
- match err. into ( ) {
1371
- opentelemetry:: global:: Error :: Trace ( err) => {
1372
- :: tracing:: error!( "OpenTelemetry trace error occurred: {}" , err)
1373
- }
1374
- opentelemetry:: global:: Error :: Metric ( err_msg) => {
1375
- :: tracing:: error!( "OpenTelemetry metric error occurred: {}" , err_msg)
1376
- }
1377
- opentelemetry:: global:: Error :: Other ( err_msg) => {
1378
- :: tracing:: error!( "OpenTelemetry error occurred: {}" , err_msg)
1379
- }
1380
- other => {
1381
- :: tracing:: error!( "OpenTelemetry error occurred: {:?}" , other)
1380
+ // We have to rate limit these errors because when they happen they are very frequent.
1381
+ // Use a dashmap to store the message type with the last time it was logged.
1382
+ let last_logged_map = OTEL_ERROR_LAST_LOGGED . get_or_init ( DashMap :: new) ;
1383
+ let err = err. into ( ) ;
1384
+
1385
+ // We don't want the dashmap to get big, so we key the error messages by type.
1386
+ let error_type = match err {
1387
+ opentelemetry:: global:: Error :: Trace ( _) => ErrorType :: Trace ,
1388
+ opentelemetry:: global:: Error :: Metric ( _) => ErrorType :: Metric ,
1389
+ _ => ErrorType :: Other ,
1390
+ } ;
1391
+ #[ cfg( not( test) ) ]
1392
+ let threshold = Duration :: from_secs ( 10 ) ;
1393
+ #[ cfg( test) ]
1394
+ let threshold = Duration :: from_millis ( 100 ) ;
1395
+
1396
+ // Copy here so that we don't retain a mutable reference into the dashmap and lock the shard
1397
+ let now = Instant :: now ( ) ;
1398
+ let last_logged = * last_logged_map
1399
+ . entry ( error_type)
1400
+ . and_modify ( |last_logged| {
1401
+ if last_logged. elapsed ( ) > threshold {
1402
+ * last_logged = now;
1403
+ }
1404
+ } )
1405
+ . or_insert_with ( || now) ;
1406
+
1407
+ if last_logged == now {
1408
+ match err {
1409
+ opentelemetry:: global:: Error :: Trace ( err) => {
1410
+ :: tracing:: error!( "OpenTelemetry trace error occurred: {}" , err)
1411
+ }
1412
+ opentelemetry:: global:: Error :: Metric ( err) => {
1413
+ :: tracing:: error!( "OpenTelemetry metric error occurred: {}" , err)
1414
+ }
1415
+ opentelemetry:: global:: Error :: Other ( err) => {
1416
+ :: tracing:: error!( "OpenTelemetry error occurred: {}" , err)
1417
+ }
1418
+ other => {
1419
+ :: tracing:: error!( "OpenTelemetry error occurred: {:?}" , other)
1420
+ }
1382
1421
}
1383
1422
}
1384
1423
}
@@ -1491,7 +1530,12 @@ impl TextMapPropagator for CustomTraceIdPropagator {
1491
1530
//
1492
1531
#[ cfg( test) ]
1493
1532
mod tests {
1533
+ use std:: fmt:: Debug ;
1534
+ use std:: ops:: DerefMut ;
1494
1535
use std:: str:: FromStr ;
1536
+ use std:: sync:: Arc ;
1537
+ use std:: sync:: Mutex ;
1538
+ use std:: time:: Duration ;
1495
1539
1496
1540
use axum:: headers:: HeaderName ;
1497
1541
use http:: HeaderMap ;
@@ -1505,6 +1549,14 @@ mod tests {
1505
1549
use tower:: util:: BoxService ;
1506
1550
use tower:: Service ;
1507
1551
use tower:: ServiceExt ;
1552
+ use tracing_core:: field:: Visit ;
1553
+ use tracing_core:: Event ;
1554
+ use tracing_core:: Field ;
1555
+ use tracing_core:: Subscriber ;
1556
+ use tracing_futures:: WithSubscriber ;
1557
+ use tracing_subscriber:: layer:: Context ;
1558
+ use tracing_subscriber:: layer:: SubscriberExt ;
1559
+ use tracing_subscriber:: Layer ;
1508
1560
1509
1561
use super :: apollo:: ForwardHeaders ;
1510
1562
use crate :: error:: FetchError ;
@@ -1515,6 +1567,7 @@ mod tests {
1515
1567
use crate :: plugin:: test:: MockSubgraphService ;
1516
1568
use crate :: plugin:: test:: MockSupergraphService ;
1517
1569
use crate :: plugin:: DynPlugin ;
1570
+ use crate :: plugins:: telemetry:: handle_error;
1518
1571
use crate :: services:: SubgraphRequest ;
1519
1572
use crate :: services:: SubgraphResponse ;
1520
1573
use crate :: services:: SupergraphRequest ;
@@ -1996,4 +2049,72 @@ mod tests {
1996
2049
let filtered_headers = super :: filter_headers ( & headers, & ForwardHeaders :: None ) ;
1997
2050
assert_eq ! ( filtered_headers. as_str( ) , "{}" ) ;
1998
2051
}
2052
+
2053
+ #[ tokio:: test]
2054
+ async fn test_handle_error_throttling ( ) {
2055
+ // Set up a fake subscriber so we can check log events. If this is useful then maybe it can be factored out into something reusable
2056
+ #[ derive( Default ) ]
2057
+ struct TestVisitor {
2058
+ log_entries : Vec < String > ,
2059
+ }
2060
+
2061
+ #[ derive( Default , Clone ) ]
2062
+ struct TestLayer {
2063
+ visitor : Arc < Mutex < TestVisitor > > ,
2064
+ }
2065
+ impl TestLayer {
2066
+ fn assert_log_entry_count ( & self , message : & str , expected : usize ) {
2067
+ let log_entries = self . visitor . lock ( ) . unwrap ( ) . log_entries . clone ( ) ;
2068
+ let actual = log_entries. iter ( ) . filter ( |e| e. contains ( message) ) . count ( ) ;
2069
+ assert_eq ! ( actual, expected) ;
2070
+ }
2071
+ }
2072
+ impl Visit for TestVisitor {
2073
+ fn record_debug ( & mut self , field : & Field , value : & dyn Debug ) {
2074
+ self . log_entries
2075
+ . push ( format ! ( "{}={:?}" , field. name( ) , value) ) ;
2076
+ }
2077
+ }
2078
+
2079
+ impl < S > Layer < S > for TestLayer
2080
+ where
2081
+ S : Subscriber ,
2082
+ Self : ' static ,
2083
+ {
2084
+ fn on_event ( & self , event : & Event < ' _ > , _ctx : Context < ' _ , S > ) {
2085
+ event. record ( self . visitor . lock ( ) . unwrap ( ) . deref_mut ( ) )
2086
+ }
2087
+ }
2088
+
2089
+ let test_layer = TestLayer :: default ( ) ;
2090
+
2091
+ async {
2092
+ // Log twice rapidly, they should get deduped
2093
+ handle_error ( opentelemetry:: global:: Error :: Other (
2094
+ "other error" . to_string ( ) ,
2095
+ ) ) ;
2096
+ handle_error ( opentelemetry:: global:: Error :: Other (
2097
+ "other error" . to_string ( ) ,
2098
+ ) ) ;
2099
+ handle_error ( opentelemetry:: global:: Error :: Trace (
2100
+ "trace error" . to_string ( ) . into ( ) ,
2101
+ ) ) ;
2102
+ }
2103
+ . with_subscriber ( tracing_subscriber:: registry ( ) . with ( test_layer. clone ( ) ) )
2104
+ . await ;
2105
+
2106
+ test_layer. assert_log_entry_count ( "other error" , 1 ) ;
2107
+ test_layer. assert_log_entry_count ( "trace error" , 1 ) ;
2108
+
2109
+ // Sleep a bit and then log again, it should get logged
2110
+ tokio:: time:: sleep ( Duration :: from_millis ( 200 ) ) . await ;
2111
+ async {
2112
+ handle_error ( opentelemetry:: global:: Error :: Other (
2113
+ "other error" . to_string ( ) ,
2114
+ ) ) ;
2115
+ }
2116
+ . with_subscriber ( tracing_subscriber:: registry ( ) . with ( test_layer. clone ( ) ) )
2117
+ . await ;
2118
+ test_layer. assert_log_entry_count ( "other error" , 2 ) ;
2119
+ }
1999
2120
}
0 commit comments