diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index 47b9cf6a0..50d0ecf10 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -24,6 +24,7 @@ url = "2.2" md5 = "0.8.0" hex = "0.4" lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = false } +tokio = { version = "1", default-features = false, features = ["time"] } # Needed for tokio::time::sleep in library code [features] self_signed_certs = [] # Empty by default for security diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 123b814af..16b9124f0 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -78,6 +78,12 @@ pub(crate) enum GenevaConfigClientError { Http(#[from] reqwest::Error), #[error("Request failed with status {status}: {message}")] RequestFailed { status: u16, message: String }, + #[error("Request failed with status {status}: {message} (Retry-After={retry_after_secs:?})")] + RequestFailedWithRetryAfter { + status: u16, + message: String, + retry_after_secs: Option, + }, // Data / parsing #[error("JSON error: {0}")] @@ -93,6 +99,18 @@ pub(crate) enum GenevaConfigClientError { #[allow(dead_code)] pub(crate) type Result = std::result::Result; +const MAX_GCS_RETRIES: usize = 3; +const FIXED_BACKOFF: [Duration; 3] = [ + Duration::from_millis(200), + Duration::from_millis(400), + Duration::from_millis(800), +]; + +/// Minimum acceptable retry-after value in seconds for rate limiting responses +const MIN_RETRY_AFTER_SECS: u64 = 1; +/// Maximum acceptable retry-after value in seconds for rate limiting responses +const MAX_RETRY_AFTER_SECS: u64 = 3; + /// Configuration for the Geneva Config Client. /// /// # Fields @@ -370,10 +388,36 @@ impl GenevaConfigClient { } } } - // Cache miss or expired token, fetch fresh data - // Perform actual fetch before acquiring write lock to minimize lock contention - let (fresh_ingestion_gateway_info, fresh_moniker_info) = - self.fetch_ingestion_info().await?; + // Cache miss or expired token, fetch fresh data with retry + let (fresh_ingestion_gateway_info, fresh_moniker_info) = { + let mut last_error = None; + let mut result = None; + for attempt in 0..=MAX_GCS_RETRIES { + match self.fetch_ingestion_info().await { + Ok(res) => { + result = Some(res); + break; + } + Err(e) => { + if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) { + let delay = match &e { + GenevaConfigClientError::RequestFailedWithRetryAfter { + retry_after_secs: Some(s), + .. + } => Duration::from_secs(*s), + _ => { + let idx = std::cmp::min(attempt, FIXED_BACKOFF.len() - 1); + FIXED_BACKOFF[idx] + } + }; + tokio::time::sleep(delay).await; + } + last_error = Some(e); + } + } + } + result.ok_or_else(|| last_error.unwrap())? + }; let token_expiry = Self::parse_token_expiry(&fresh_ingestion_gateway_info.auth_token_expiry_time) @@ -438,6 +482,18 @@ impl GenevaConfigClient { .map_err(GenevaConfigClientError::Http)?; // Check if the response is successful let status = response.status(); + // Capture Retry-After before consuming body + let retry_after_header = if status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status == reqwest::StatusCode::SERVICE_UNAVAILABLE + { + response + .headers() + .get("Retry-After") + .and_then(|hv| hv.to_str().ok()) + .map(|s| s.trim().to_string()) + } else { + None + }; let body = response.text().await?; if status.is_success() { let parsed = match serde_json::from_str::(&body) { @@ -463,6 +519,17 @@ impl GenevaConfigClient { Err(GenevaConfigClientError::MonikerNotFound( "No primary diag moniker found in storage accounts".to_string(), )) + } else if status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status == reqwest::StatusCode::SERVICE_UNAVAILABLE + { + let retry_after_secs = retry_after_header + .and_then(|s| s.parse::().ok()) + .filter(|v| *v >= MIN_RETRY_AFTER_SECS && *v <= MAX_RETRY_AFTER_SECS); + Err(GenevaConfigClientError::RequestFailedWithRetryAfter { + status: status.as_u16(), + message: body, + retry_after_secs, + }) } else { Err(GenevaConfigClientError::RequestFailed { status: status.as_u16(), @@ -533,6 +600,17 @@ fn extract_endpoint_from_token(token: &str) -> Result { Ok(endpoint) } +fn is_retriable_error(err: &GenevaConfigClientError) -> bool { + match err { + GenevaConfigClientError::Http(e) => e.is_timeout() || e.is_connect(), + GenevaConfigClientError::RequestFailed { status, .. } + | GenevaConfigClientError::RequestFailedWithRetryAfter { status, .. } => { + (*status >= 500 && *status < 600) || *status == 429 + } + _ => false, + } +} + #[cfg(feature = "self_signed_certs")] fn configure_tls_connector( mut builder: native_tls::TlsConnectorBuilder, @@ -558,3 +636,46 @@ fn configure_tls_connector( .max_protocol_version(Some(Protocol::Tlsv12)); builder } + +#[cfg(test)] +mod retry_tests { + use super::*; + + #[test] + fn test_is_retriable_request_failed_statuses() { + // 500 range + for status in [500u16, 502, 503, 504, 599] { + let err = GenevaConfigClientError::RequestFailed { + status, + message: String::new(), + }; + assert!( + super::is_retriable_error(&err), + "status {status} should be retriable" + ); + } + // 429 + let err_429 = GenevaConfigClientError::RequestFailed { + status: 429, + message: String::new(), + }; + assert!( + super::is_retriable_error(&err_429), + "status 429 should be retriable" + ); + } + + #[test] + fn test_is_not_retriable_request_failed_statuses() { + for status in [400u16, 401, 403, 404, 422] { + let err = GenevaConfigClientError::RequestFailed { + status, + message: String::new(), + }; + assert!( + !super::is_retriable_error(&err), + "status {status} should NOT be retriable" + ); + } + } +}