From fd9eac76619c0a5062cc0b82c7ad9bc974aea2ed Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 2 Sep 2025 16:06:07 -0700 Subject: [PATCH 1/8] initial commit --- .../src/config_service/client.rs | 69 +++++++++++++++++-- 1 file changed, 65 insertions(+), 4 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 123b814af..ff3ca68e2 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -93,6 +93,8 @@ pub(crate) enum GenevaConfigClientError { #[allow(dead_code)] pub(crate) type Result = std::result::Result; +const MAX_GCS_RETRIES: usize = 5; + /// Configuration for the Geneva Config Client. /// /// # Fields @@ -370,10 +372,23 @@ impl GenevaConfigClient { } } } - // Cache miss or expired token, fetch fresh data - // Perform actual fetch before acquiring write lock to minimize lock contention - let (fresh_ingestion_gateway_info, fresh_moniker_info) = - self.fetch_ingestion_info().await?; + // Cache miss or expired token, fetch fresh data with retry logic (up to 5 retries on retriable errors) + let (fresh_ingestion_gateway_info, fresh_moniker_info) = { + let mut attempt = 0; + loop { + match self.fetch_ingestion_info().await { + Ok(v) => break v, + Err(e) => { + if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) { + attempt += 1; + continue; + } else { + return Err(e); + } + } + } + } + }; let token_expiry = Self::parse_token_expiry(&fresh_ingestion_gateway_info.auth_token_expiry_time) @@ -533,6 +548,18 @@ fn extract_endpoint_from_token(token: &str) -> Result { Ok(endpoint) } +fn is_retriable_error(err: &GenevaConfigClientError) -> bool { + match err { + GenevaConfigClientError::Http(e) => { + e.is_timeout() || e.is_connect() + } + GenevaConfigClientError::RequestFailed { status, .. } => { + (*status >= 500 && *status < 600) || *status == 429 + } + _ => false, + } +} + #[cfg(feature = "self_signed_certs")] fn configure_tls_connector( mut builder: native_tls::TlsConnectorBuilder, @@ -558,3 +585,37 @@ fn configure_tls_connector( .max_protocol_version(Some(Protocol::Tlsv12)); builder } + +#[cfg(test)] +mod retry_tests { + use super::*; + + #[test] + fn test_is_retriable_request_failed_statuses() { + // 500 range + for status in [500u16, 502, 503, 504, 599] { + let err = GenevaConfigClientError::RequestFailed { + status, + message: String::new(), + }; + assert!(super::is_retriable_error(&err), "status {status} should be retriable"); + } + // 429 + let err_429 = GenevaConfigClientError::RequestFailed { + status: 429, + message: String::new(), + }; + assert!(super::is_retriable_error(&err_429), "status 429 should be retriable"); + } + + #[test] + fn test_is_not_retriable_request_failed_statuses() { + for status in [400u16, 401, 403, 404, 422] { + let err = GenevaConfigClientError::RequestFailed { + status, + message: String::new(), + }; + assert!(!super::is_retriable_error(&err), "status {status} should NOT be retriable"); + } + } +} From e1710953b599d1beba21b713185b1683bdba583e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 2 Sep 2025 16:30:24 -0700 Subject: [PATCH 2/8] lint --- .../src/config_service/client.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index ff3ca68e2..7ffb4370b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -550,9 +550,7 @@ fn extract_endpoint_from_token(token: &str) -> Result { fn is_retriable_error(err: &GenevaConfigClientError) -> bool { match err { - GenevaConfigClientError::Http(e) => { - e.is_timeout() || e.is_connect() - } + GenevaConfigClientError::Http(e) => e.is_timeout() || e.is_connect(), GenevaConfigClientError::RequestFailed { status, .. } => { (*status >= 500 && *status < 600) || *status == 429 } @@ -598,14 +596,20 @@ mod retry_tests { status, message: String::new(), }; - assert!(super::is_retriable_error(&err), "status {status} should be retriable"); + assert!( + super::is_retriable_error(&err), + "status {status} should be retriable" + ); } // 429 let err_429 = GenevaConfigClientError::RequestFailed { status: 429, message: String::new(), }; - assert!(super::is_retriable_error(&err_429), "status 429 should be retriable"); + assert!( + super::is_retriable_error(&err_429), + "status 429 should be retriable" + ); } #[test] @@ -615,7 +619,10 @@ mod retry_tests { status, message: String::new(), }; - assert!(!super::is_retriable_error(&err), "status {status} should NOT be retriable"); + assert!( + !super::is_retriable_error(&err), + "status {status} should NOT be retriable" + ); } } } From 7c963e3aa2d82513ef7d051d90ea5c83bb776ba7 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sat, 13 Sep 2025 18:10:08 +0530 Subject: [PATCH 3/8] fix --- .../geneva-uploader/Cargo.toml | 1 + .../src/config_service/client.rs | 59 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index 47b9cf6a0..50d0ecf10 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -24,6 +24,7 @@ url = "2.2" md5 = "0.8.0" hex = "0.4" lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = false } +tokio = { version = "1", default-features = false, features = ["time"] } # Needed for tokio::time::sleep in library code [features] self_signed_certs = [] # Empty by default for security diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 7ffb4370b..b438fa81b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -78,6 +78,12 @@ pub(crate) enum GenevaConfigClientError { Http(#[from] reqwest::Error), #[error("Request failed with status {status}: {message}")] RequestFailed { status: u16, message: String }, + #[error("Request failed with status {status}: {message} (Retry-After={retry_after_secs:?})")] + RequestFailedWithRetryAfter { + status: u16, + message: String, + retry_after_secs: Option, + }, // Data / parsing #[error("JSON error: {0}")] @@ -94,6 +100,11 @@ pub(crate) enum GenevaConfigClientError { pub(crate) type Result = std::result::Result; const MAX_GCS_RETRIES: usize = 5; +const FIXED_BACKOFF: [Duration; 3] = [ + Duration::from_millis(200), + Duration::from_millis(400), + Duration::from_millis(800), +]; /// Configuration for the Geneva Config Client. /// @@ -380,6 +391,18 @@ impl GenevaConfigClient { Ok(v) => break v, Err(e) => { if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) { + // Determine delay + let delay = match &e { + GenevaConfigClientError::RequestFailedWithRetryAfter { + retry_after_secs: Some(s), + .. + } => Duration::from_secs(*s), + _ => { + let idx = std::cmp::min(attempt, FIXED_BACKOFF.len() - 1); + FIXED_BACKOFF[idx] + } + }; + tokio::time::sleep(delay).await; attempt += 1; continue; } else { @@ -453,6 +476,18 @@ impl GenevaConfigClient { .map_err(GenevaConfigClientError::Http)?; // Check if the response is successful let status = response.status(); + // Capture Retry-After before consuming body + let retry_after_header = if status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status == reqwest::StatusCode::SERVICE_UNAVAILABLE + { + response + .headers() + .get("Retry-After") + .and_then(|hv| hv.to_str().ok()) + .map(|s| s.trim().to_string()) + } else { + None + }; let body = response.text().await?; if status.is_success() { let parsed = match serde_json::from_str::(&body) { @@ -479,10 +514,23 @@ impl GenevaConfigClient { "No primary diag moniker found in storage accounts".to_string(), )) } else { - Err(GenevaConfigClientError::RequestFailed { - status: status.as_u16(), - message: body, - }) + if status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status == reqwest::StatusCode::SERVICE_UNAVAILABLE + { + let retry_after_secs = retry_after_header + .and_then(|s| s.parse::().ok()) + .filter(|v| *v >= 1 && *v <= 3); + Err(GenevaConfigClientError::RequestFailedWithRetryAfter { + status: status.as_u16(), + message: body, + retry_after_secs, + }) + } else { + Err(GenevaConfigClientError::RequestFailed { + status: status.as_u16(), + message: body, + }) + } } } } @@ -551,7 +599,8 @@ fn extract_endpoint_from_token(token: &str) -> Result { fn is_retriable_error(err: &GenevaConfigClientError) -> bool { match err { GenevaConfigClientError::Http(e) => e.is_timeout() || e.is_connect(), - GenevaConfigClientError::RequestFailed { status, .. } => { + GenevaConfigClientError::RequestFailed { status, .. } + | GenevaConfigClientError::RequestFailedWithRetryAfter { status, .. } => { (*status >= 500 && *status < 600) || *status == 429 } _ => false, From 2131aed25022d905a247947963efe92cd9ac142e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sat, 13 Sep 2025 22:16:47 +0530 Subject: [PATCH 4/8] fix --- .../src/config_service/client.rs | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index b438fa81b..235fdced7 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -513,24 +513,22 @@ impl GenevaConfigClient { Err(GenevaConfigClientError::MonikerNotFound( "No primary diag moniker found in storage accounts".to_string(), )) + } else if status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status == reqwest::StatusCode::SERVICE_UNAVAILABLE + { + let retry_after_secs = retry_after_header + .and_then(|s| s.parse::().ok()) + .filter(|v| *v >= 1 && *v <= 3); + Err(GenevaConfigClientError::RequestFailedWithRetryAfter { + status: status.as_u16(), + message: body, + retry_after_secs, + }) } else { - if status == reqwest::StatusCode::TOO_MANY_REQUESTS - || status == reqwest::StatusCode::SERVICE_UNAVAILABLE - { - let retry_after_secs = retry_after_header - .and_then(|s| s.parse::().ok()) - .filter(|v| *v >= 1 && *v <= 3); - Err(GenevaConfigClientError::RequestFailedWithRetryAfter { - status: status.as_u16(), - message: body, - retry_after_secs, - }) - } else { - Err(GenevaConfigClientError::RequestFailed { - status: status.as_u16(), - message: body, - }) - } + Err(GenevaConfigClientError::RequestFailed { + status: status.as_u16(), + message: body, + }) } } } From b7fe253132257e7334774ded997fc5a4982948a5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sat, 13 Sep 2025 22:40:01 +0530 Subject: [PATCH 5/8] fix --- .../src/config_service/client.rs | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 235fdced7..59c6d941f 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -383,16 +383,22 @@ impl GenevaConfigClient { } } } - // Cache miss or expired token, fetch fresh data with retry logic (up to 5 retries on retriable errors) + // Cache miss or expired token, fetch fresh data with retry let (fresh_ingestion_gateway_info, fresh_moniker_info) = { - let mut attempt = 0; - loop { + let mut last_error = None; + let mut result = None; + for attempt in 0..=MAX_GCS_RETRIES { match self.fetch_ingestion_info().await { - Ok(v) => break v, + Ok(res) => { + result = Some(res); + break; + } Err(e) => { - if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) { - // Determine delay - let delay = match &e { + last_error = Some(e); + if attempt < MAX_GCS_RETRIES + && is_retriable_error(last_error.as_ref().unwrap()) + { + let delay = match last_error.as_ref().unwrap() { GenevaConfigClientError::RequestFailedWithRetryAfter { retry_after_secs: Some(s), .. @@ -403,14 +409,11 @@ impl GenevaConfigClient { } }; tokio::time::sleep(delay).await; - attempt += 1; - continue; - } else { - return Err(e); } } } } + result.ok_or_else(|| last_error.unwrap())? }; let token_expiry = From 92bc8da2f9627d47c832178ff9bf2b161e7668b0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 14 Sep 2025 06:15:47 +0530 Subject: [PATCH 6/8] fix --- .../geneva-uploader/src/config_service/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 59c6d941f..b222e0683 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -99,7 +99,7 @@ pub(crate) enum GenevaConfigClientError { #[allow(dead_code)] pub(crate) type Result = std::result::Result; -const MAX_GCS_RETRIES: usize = 5; +const MAX_GCS_RETRIES: usize = 3; const FIXED_BACKOFF: [Duration; 3] = [ Duration::from_millis(200), Duration::from_millis(400), From 437eb36bc13cdd0c719382f5b2d796f4e106b943 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 14 Sep 2025 06:22:47 +0530 Subject: [PATCH 7/8] fix --- .../geneva-uploader/src/config_service/client.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index b222e0683..ccdf155a9 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -394,11 +394,8 @@ impl GenevaConfigClient { break; } Err(e) => { - last_error = Some(e); - if attempt < MAX_GCS_RETRIES - && is_retriable_error(last_error.as_ref().unwrap()) - { - let delay = match last_error.as_ref().unwrap() { + if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) { + let delay = match &e { GenevaConfigClientError::RequestFailedWithRetryAfter { retry_after_secs: Some(s), .. @@ -410,6 +407,7 @@ impl GenevaConfigClient { }; tokio::time::sleep(delay).await; } + last_error = Some(e); } } } From 49fbb6d9878babc31a7e02995b34f7dffb7430db Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 14 Sep 2025 06:24:44 +0530 Subject: [PATCH 8/8] fix --- .../geneva-uploader/src/config_service/client.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index ccdf155a9..16b9124f0 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -106,6 +106,11 @@ const FIXED_BACKOFF: [Duration; 3] = [ Duration::from_millis(800), ]; +/// Minimum acceptable retry-after value in seconds for rate limiting responses +const MIN_RETRY_AFTER_SECS: u64 = 1; +/// Maximum acceptable retry-after value in seconds for rate limiting responses +const MAX_RETRY_AFTER_SECS: u64 = 3; + /// Configuration for the Geneva Config Client. /// /// # Fields @@ -519,7 +524,7 @@ impl GenevaConfigClient { { let retry_after_secs = retry_after_header .and_then(|s| s.parse::().ok()) - .filter(|v| *v >= 1 && *v <= 3); + .filter(|v| *v >= MIN_RETRY_AFTER_SECS && *v <= MAX_RETRY_AFTER_SECS); Err(GenevaConfigClientError::RequestFailedWithRetryAfter { status: status.as_u16(), message: body,