diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index d31925b52..5c5d0ff5d 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -21,6 +21,8 @@ md5 = "0.8.0" hex = "0.4" lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = false } futures = "0.3" +tokio = { version = "1.0", features = ["time"] } +rand = "0.8" [features] self_signed_certs = [] # Empty by default for security @@ -37,7 +39,7 @@ futures = "0.3" num_cpus = "1.16" lz4_flex = { version = "0.11" } criterion = {version = "0.6"} -rand = {version = "0.9"} +rand = {version = "0.8"} [lints] workspace = true diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs index 78e18c2b7..1b42c72cc 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs @@ -92,28 +92,28 @@ mod benchmarks { 0 => AnyValue { value: Some( opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( - format!("string_value_{}", rng.random::()), + format!("string_value_{}", rng.r#gen::()), ), ), }, 1 => AnyValue { value: Some( opentelemetry_proto::tonic::common::v1::any_value::Value::IntValue( - rng.random_range(0..1000), + rng.gen_range(0..1000), ), ), }, 2 => AnyValue { value: Some( opentelemetry_proto::tonic::common::v1::any_value::Value::DoubleValue( - rng.random_range(0.0..100.0), + rng.gen_range(0.0..100.0), ), ), }, _ => AnyValue { value: Some( opentelemetry_proto::tonic::common::v1::any_value::Value::BoolValue( - rng.random::() < 0.5, + rng.r#gen::() < 0.5, ), ), }, diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index 72bf5dd42..9cbf76555 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -23,6 +23,8 @@ pub struct GenevaClientConfig { pub role_instance: String, /// Maximum number of concurrent uploads. If None, defaults to number of CPU cores. pub max_concurrent_uploads: Option, + /// Retry configuration for failed uploads. If None, uses default retry settings. + pub retry_config: Option, // Add event name/version here if constant, or per-upload if you want them per call. } @@ -73,6 +75,7 @@ impl GenevaClient { source_identity, environment: cfg.environment, config_version: config_version.clone(), + retry_config: cfg.retry_config.unwrap_or_default(), }; let uploader = GenevaUploader::from_config_client(config_client, uploader_config) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs index 55eed1c9d..1570c6f24 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/mod.rs @@ -4,6 +4,381 @@ pub(crate) mod uploader; mod tests { use std::time::Instant; + // WireMock tests for retry logic + #[cfg(all(test, feature = "mock_auth"))] + mod wiremock_tests { + use crate::config_service::client::{ + AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, + }; + use crate::ingestion_service::uploader::{GenevaUploader, GenevaUploaderConfig}; + use crate::payload_encoder::central_blob::BatchMetadata; + use crate::retry::RetryConfig; + use std::sync::Arc; + use std::time::Duration; + use wiremock::matchers::{method, path_regex}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + // Helper to create a mock config service response + fn mock_config_response() -> serde_json::Value { + // Create a valid JWT token with an Endpoint claim + // Header: {"alg":"HS256","typ":"JWT"} + // Payload: {"Endpoint":"https://test.endpoint"} + // This is a mock JWT token that will be replaced with the actual ingestion server URL + let jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJFbmRwb2ludCI6Imh0dHBzOi8vdGVzdC5lbmRwb2ludCJ9.signature"; + + serde_json::json!({ + "IngestionGatewayInfo": { + "Endpoint": "PLACEHOLDER", // Will be replaced with mock server URL + "AuthToken": jwt_token, + "AuthTokenExpiryTime": "2030-01-01T00:00:00Z" + }, + "StorageAccountKeys": [ + { + "AccountMonikerName": "mock-diag-moniker", + "AccountGroupName": "mock-group", + "IsPrimaryMoniker": true + } + ], + "TagId": "mock-tag-id" + }) + } + + #[tokio::test] + async fn test_retry_on_transient_failure() { + let config_server = MockServer::start().await; + let ingestion_server = MockServer::start().await; + + // Create config response with ingestion server URL + let mut config_response = mock_config_response(); + config_response["IngestionGatewayInfo"]["Endpoint"] = + serde_json::Value::String(ingestion_server.uri()); + + // Mock config service endpoint + Mock::given(method("GET")) + .and(path_regex("/api/agent/v3/.*/MonitoringStorageKeys/")) + .respond_with(ResponseTemplate::new(200).set_body_json(&config_response)) + .mount(&config_server) + .await; + + // Mock ingestion endpoint - fail twice, then succeed + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(500).set_body_string("Server error")) + .up_to_n_times(2) + .mount(&ingestion_server) + .await; + + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(202).set_body_json(serde_json::json!({ + "ticket": "success-ticket-123" + }))) + .mount(&ingestion_server) + .await; + + // Create uploader with retry config + let config = GenevaConfigClientConfig { + endpoint: config_server.uri(), + environment: "test".into(), + account: "test".into(), + namespace: "test".into(), + region: "test".into(), + config_major_version: 1, + auth_method: AuthMethod::MockAuth, + }; + + let config_client = Arc::new(GenevaConfigClient::new(config).unwrap()); + + let uploader_config = GenevaUploaderConfig { + namespace: "test".into(), + source_identity: "test".into(), + environment: "test".into(), + config_version: "Ver1v0".into(), + retry_config: RetryConfig::new() + .with_max_retries(3) + .with_delay(Duration::from_millis(10)), // Short delay for testing + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .unwrap(); + + let metadata = BatchMetadata { + start_time: 1_700_000_000_000_000_000, + end_time: 1_700_000_300_000_000_000, + schema_ids: "test-schema".to_string(), + }; + + // Should retry twice and succeed on third attempt + let result = uploader.upload(vec![1, 2, 3], "TestEvent", &metadata).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap().ticket, "success-ticket-123"); + } + + #[tokio::test] + async fn test_no_retry_on_client_error() { + let config_server = MockServer::start().await; + let ingestion_server = MockServer::start().await; + + // Create config response with ingestion server URL + let mut config_response = mock_config_response(); + config_response["IngestionGatewayInfo"]["Endpoint"] = + serde_json::Value::String(ingestion_server.uri()); + + // Mock config service endpoint + Mock::given(method("GET")) + .and(path_regex("/api/agent/v3/.*/MonitoringStorageKeys/")) + .respond_with(ResponseTemplate::new(200).set_body_json(&config_response)) + .mount(&config_server) + .await; + + // Mock ingestion endpoint - return 400 Bad Request + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(400).set_body_string("Bad request")) + .expect(1) // Should only be called once (no retries) + .mount(&ingestion_server) + .await; + + // Create uploader with retry config + let config = GenevaConfigClientConfig { + endpoint: config_server.uri(), + environment: "test".into(), + account: "test".into(), + namespace: "test".into(), + region: "test".into(), + config_major_version: 1, + auth_method: AuthMethod::MockAuth, + }; + + let config_client = Arc::new(GenevaConfigClient::new(config).unwrap()); + + let uploader_config = GenevaUploaderConfig { + namespace: "test".into(), + source_identity: "test".into(), + environment: "test".into(), + config_version: "Ver1v0".into(), + retry_config: RetryConfig::new() + .with_max_retries(3) + .with_delay(Duration::from_millis(10)), + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .unwrap(); + + let metadata = BatchMetadata { + start_time: 1_700_000_000_000_000_000, + end_time: 1_700_000_300_000_000_000, + schema_ids: "test-schema".to_string(), + }; + + // Should fail immediately without retries + let result = uploader.upload(vec![1, 2, 3], "TestEvent", &metadata).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_retry_on_rate_limiting() { + let config_server = MockServer::start().await; + let ingestion_server = MockServer::start().await; + + // Create config response with ingestion server URL + let mut config_response = mock_config_response(); + config_response["IngestionGatewayInfo"]["Endpoint"] = + serde_json::Value::String(ingestion_server.uri()); + + // Mock config service endpoint + Mock::given(method("GET")) + .and(path_regex("/api/agent/v3/.*/MonitoringStorageKeys/")) + .respond_with(ResponseTemplate::new(200).set_body_json(&config_response)) + .mount(&config_server) + .await; + + // Mock ingestion endpoint - rate limit once, then succeed + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(429).set_body_string("Too many requests")) + .up_to_n_times(1) + .mount(&ingestion_server) + .await; + + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(202).set_body_json(serde_json::json!({ + "ticket": "success-after-rate-limit" + }))) + .mount(&ingestion_server) + .await; + + // Create uploader with retry config + let config = GenevaConfigClientConfig { + endpoint: config_server.uri(), + environment: "test".into(), + account: "test".into(), + namespace: "test".into(), + region: "test".into(), + config_major_version: 1, + auth_method: AuthMethod::MockAuth, + }; + + let config_client = Arc::new(GenevaConfigClient::new(config).unwrap()); + + let uploader_config = GenevaUploaderConfig { + namespace: "test".into(), + source_identity: "test".into(), + environment: "test".into(), + config_version: "Ver1v0".into(), + retry_config: RetryConfig::new() + .with_max_retries(2) + .with_delay(Duration::from_millis(10)), + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .unwrap(); + + let metadata = BatchMetadata { + start_time: 1_700_000_000_000_000_000, + end_time: 1_700_000_300_000_000_000, + schema_ids: "test-schema".to_string(), + }; + + // Should retry on 429 and succeed + let result = uploader.upload(vec![1, 2, 3], "TestEvent", &metadata).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap().ticket, "success-after-rate-limit"); + } + + #[tokio::test] + async fn test_max_retries_exhausted() { + let config_server = MockServer::start().await; + let ingestion_server = MockServer::start().await; + + // Create config response with ingestion server URL + let mut config_response = mock_config_response(); + config_response["IngestionGatewayInfo"]["Endpoint"] = + serde_json::Value::String(ingestion_server.uri()); + + // Mock config service endpoint + Mock::given(method("GET")) + .and(path_regex("/api/agent/v3/.*/MonitoringStorageKeys/")) + .respond_with(ResponseTemplate::new(200).set_body_json(&config_response)) + .mount(&config_server) + .await; + + // Mock ingestion endpoint - always fail with 503 + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(503).set_body_string("Service unavailable")) + .expect(3) // Initial attempt + 2 retries + .mount(&ingestion_server) + .await; + + // Create uploader with retry config + let config = GenevaConfigClientConfig { + endpoint: config_server.uri(), + environment: "test".into(), + account: "test".into(), + namespace: "test".into(), + region: "test".into(), + config_major_version: 1, + auth_method: AuthMethod::MockAuth, + }; + + let config_client = Arc::new(GenevaConfigClient::new(config).unwrap()); + + let uploader_config = GenevaUploaderConfig { + namespace: "test".into(), + source_identity: "test".into(), + environment: "test".into(), + config_version: "Ver1v0".into(), + retry_config: RetryConfig::new() + .with_max_retries(2) + .with_delay(Duration::from_millis(10)), + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .unwrap(); + + let metadata = BatchMetadata { + start_time: 1_700_000_000_000_000_000, + end_time: 1_700_000_300_000_000_000, + schema_ids: "test-schema".to_string(), + }; + + // Should fail after exhausting retries + let result = uploader.upload(vec![1, 2, 3], "TestEvent", &metadata).await; + assert!(result.is_err()); + let err_msg = format!("{:?}", result.unwrap_err()); + assert!(err_msg.contains("503") || err_msg.contains("Service unavailable")); + } + + #[tokio::test] + async fn test_zero_retries_configuration() { + let config_server = MockServer::start().await; + let ingestion_server = MockServer::start().await; + + // Create config response with ingestion server URL + let mut config_response = mock_config_response(); + config_response["IngestionGatewayInfo"]["Endpoint"] = + serde_json::Value::String(ingestion_server.uri()); + + // Mock config service endpoint + Mock::given(method("GET")) + .and(path_regex("/api/agent/v3/.*/MonitoringStorageKeys/")) + .respond_with(ResponseTemplate::new(200).set_body_json(&config_response)) + .mount(&config_server) + .await; + + // Mock ingestion endpoint - fail with 500 + Mock::given(method("POST")) + .and(path_regex("/api/v1/ingestion/ingest.*")) + .respond_with(ResponseTemplate::new(500).set_body_string("Server error")) + .expect(1) // Should only be called once (no retries) + .mount(&ingestion_server) + .await; + + // Create uploader with zero retries + let config = GenevaConfigClientConfig { + endpoint: config_server.uri(), + environment: "test".into(), + account: "test".into(), + namespace: "test".into(), + region: "test".into(), + config_major_version: 1, + auth_method: AuthMethod::MockAuth, + }; + + let config_client = Arc::new(GenevaConfigClient::new(config).unwrap()); + + let uploader_config = GenevaUploaderConfig { + namespace: "test".into(), + source_identity: "test".into(), + environment: "test".into(), + config_version: "Ver1v0".into(), + retry_config: RetryConfig::new() + .with_max_retries(0) // No retries + .with_delay(Duration::from_millis(10)), + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .unwrap(); + + let metadata = BatchMetadata { + start_time: 1_700_000_000_000_000_000, + end_time: 1_700_000_300_000_000_000, + schema_ids: "test-schema".to_string(), + }; + + // Should fail immediately without retries + let result = uploader.upload(vec![1, 2, 3], "TestEvent", &metadata).await; + assert!(result.is_err()); + } + } + mod test_helpers { use crate::{ AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaUploader, @@ -51,6 +426,7 @@ mod tests { source_identity, environment: environment.clone(), config_version, + retry_config: crate::RetryConfig::default(), }; let config = GenevaConfigClientConfig { diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs index 5da88da59..ddc9e50fd 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs @@ -1,5 +1,6 @@ use crate::config_service::client::{GenevaConfigClient, GenevaConfigClientError}; use crate::payload_encoder::central_blob::BatchMetadata; +use crate::retry::{retry_with_config_and_check, RetryConfig}; use reqwest::{header, Client}; use serde::Deserialize; use serde_json::Value; @@ -13,12 +14,12 @@ use url::form_urlencoded::byte_serialize; use uuid::Uuid; /// Error types for the Geneva Uploader -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub(crate) enum GenevaUploaderError { #[error("HTTP error: {0}")] Http(String), #[error("JSON error: {0}")] - SerdeJson(#[from] serde_json::Error), + SerdeJson(String), #[error("Config service error: {0}")] ConfigClient(String), #[allow(dead_code)] @@ -29,6 +30,12 @@ pub(crate) enum GenevaUploaderError { InternalError(String), } +impl From for GenevaUploaderError { + fn from(err: serde_json::Error) -> Self { + GenevaUploaderError::SerdeJson(err.to_string()) + } +} + impl From for GenevaUploaderError { fn from(err: GenevaConfigClientError) -> Self { // This preserves the original error message format from the code @@ -86,6 +93,28 @@ impl From for GenevaUploaderError { } } +impl GenevaUploaderError { + /// Determines if this error represents a transient condition that should be retried + pub fn is_retriable(&self) -> bool { + match self { + // Network/connection issues - always retry + GenevaUploaderError::Http(msg) => { + msg.contains("timeout") + || msg.contains("connect") + || msg.contains("io::ErrorKind::ConnectionRefused") + || msg.contains("io::ErrorKind::TimedOut") + || msg.contains("io::ErrorKind::UnexpectedEof") + } + // HTTP status codes - retry server errors and rate limiting + GenevaUploaderError::UploadFailed { status, .. } => *status >= 500 || *status == 429, + // Config service errors - may be transient (auth token refresh, etc.) + GenevaUploaderError::ConfigClient(_) => true, + // Don't retry parsing or internal errors + GenevaUploaderError::SerdeJson(_) | GenevaUploaderError::InternalError(_) => false, + } + } +} + pub(crate) type Result = std::result::Result; #[allow(dead_code)] @@ -105,6 +134,7 @@ pub(crate) struct GenevaUploaderConfig { #[allow(dead_code)] pub environment: String, pub config_version: String, + pub retry_config: RetryConfig, } /// Client for uploading data to Geneva Ingestion Gateway (GIG) @@ -190,20 +220,10 @@ impl GenevaUploader { Ok(query) } - /// Uploads data to the ingestion gateway - /// - /// # Arguments - /// * `data` - The encoded data to upload (already in the required format) - /// * `event_name` - Name of the event - /// * `event_version` - Version of the event - /// * `metadata` - Batch metadata containing timestamps and schema information - /// - /// # Returns - /// * `Result` - The response containing the ticket ID or an error - #[allow(dead_code)] - pub(crate) async fn upload( + /// Performs a single upload attempt + async fn upload_attempt( &self, - data: Vec, + data: &[u8], event_name: &str, metadata: &BatchMetadata, ) -> Result { @@ -223,6 +243,7 @@ impl GenevaUploader { auth_info.endpoint.trim_end_matches('/'), upload_uri ); + // Send the upload request let response = self .http_client @@ -231,16 +252,15 @@ impl GenevaUploader { header::AUTHORIZATION, format!("Bearer {}", auth_info.auth_token), ) - .body(data) + .body(data.to_vec()) .send() .await?; + let status = response.status(); let body = response.text().await?; if status == reqwest::StatusCode::ACCEPTED { - let ingest_response: IngestionResponse = - serde_json::from_str(&body).map_err(GenevaUploaderError::SerdeJson)?; - + let ingest_response: IngestionResponse = serde_json::from_str(&body)?; Ok(ingest_response) } else { Err(GenevaUploaderError::UploadFailed { @@ -249,4 +269,34 @@ impl GenevaUploader { }) } } + + /// Uploads data to the ingestion gateway with retry support + /// + /// # Arguments + /// * `data` - The encoded data to upload (already in the required format) + /// * `event_name` - Name of the event + /// * `metadata` - Batch metadata containing timestamps and schema information + /// + /// # Returns + /// * `Result` - The response containing the ticket ID or an error + #[allow(dead_code)] + pub(crate) async fn upload( + &self, + data: Vec, + event_name: &str, + metadata: &BatchMetadata, + ) -> Result { + let operation_name = format!("Upload {}", event_name); + + retry_with_config_and_check( + &self.config.retry_config, + &operation_name, + || { + let data_ref = &data; + async move { self.upload_attempt(data_ref, event_name, metadata).await } + }, + |error| error.is_retriable(), + ) + .await + } } diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs index e322626cc..f52aad33b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs @@ -1,6 +1,7 @@ mod config_service; mod ingestion_service; pub mod payload_encoder; +pub mod retry; pub mod client; @@ -19,3 +20,4 @@ pub(crate) use ingestion_service::uploader::{ pub use client::{GenevaClient, GenevaClientConfig}; pub use config_service::client::AuthMethod; +pub use retry::RetryConfig; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/retry.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/retry.rs new file mode 100644 index 000000000..6ca0ef846 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/retry.rs @@ -0,0 +1,390 @@ +//! Simple retry functionality that can be shared across services + +use std::time::Duration; +use tokio::time::sleep; + +/// Simple retry configuration +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// Maximum number of retry attempts (0 means no retries) + pub max_retries: u32, + /// Fixed delay between retries + pub delay: Duration, + // TODO: Add support for exponential backoff + // - Add fields for initial_delay, max_delay, and backoff_multiplier + // - Implement exponential backoff logic in retry functions + + // TODO: Add support for jitter to prevent thundering herd + // - Add jitter_factor field (e.g., 0.1 for 10% jitter) + // - Apply random jitter to calculated delays +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: 3, + delay: Duration::from_millis(1000), // 1 second + } + } +} + +impl RetryConfig { + /// Create a new retry configuration with default values + pub fn new() -> Self { + Self::default() + } + + /// Set the maximum number of retry attempts + pub fn with_max_retries(mut self, max_retries: u32) -> Self { + self.max_retries = max_retries; + self + } + + /// Set the delay between retries + pub fn with_delay(mut self, delay: Duration) -> Self { + self.delay = delay; + self + } +} + +/// Execute a function with retry logic +pub async fn retry_with_config( + config: &RetryConfig, + operation_name: &str, + mut operation: F, +) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Display + Clone, +{ + let max_attempts = config.max_retries.saturating_add(1); // +1 for the initial attempt, with overflow protection + + for attempt in 0..max_attempts { + // Wait before retry (but not before the first attempt) + if attempt > 0 && !config.delay.is_zero() { + // TODO: Implement exponential backoff with jitter + // - Calculate delay as: initial_delay * (backoff_multiplier ^ attempt) + // - Cap delay at max_delay + // - Apply jitter: delay = delay * (1 + jitter_factor * (random - 0.5)) + sleep(config.delay).await; + } + + match operation().await { + Ok(result) => { + // Success! Log retry info if this wasn't the first attempt + if attempt > 0 { + eprintln!( + "{} succeeded on attempt {} after retries", + operation_name, + attempt + 1 + ); + } + return Ok(result); + } + Err(error) => { + let is_last_attempt = attempt >= max_attempts.saturating_sub(1); + + if is_last_attempt { + // We've exhausted all retries + eprintln!( + "{} failed after {} attempts: {}", + operation_name, max_attempts, error + ); + return Err(error); + } else { + eprintln!( + "{} attempt {} failed: {}. Retrying in {:?}...", + operation_name, + attempt + 1, + error, + config.delay + ); + // Continue to next iteration + } + } + } + } + + // This should be unreachable, but included for completeness + unreachable!("Retry loop should have returned by now") +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + + #[derive(Debug, Clone)] + struct TestError { + message: String, + retriable: bool, + } + + impl std::fmt::Display for TestError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } + } + + impl TestError { + fn new(message: &str, retriable: bool) -> Self { + Self { + message: message.to_string(), + retriable, + } + } + + fn is_retriable(&self) -> bool { + self.retriable + } + } + + #[tokio::test] + async fn test_retry_config_defaults() { + let config = RetryConfig::default(); + assert_eq!(config.max_retries, 3); + assert_eq!(config.delay, Duration::from_millis(1000)); + } + + #[tokio::test] + async fn test_retry_config_builder() { + let config = RetryConfig::new() + .with_max_retries(5) + .with_delay(Duration::from_millis(500)); + + assert_eq!(config.max_retries, 5); + assert_eq!(config.delay, Duration::from_millis(500)); + } + + #[tokio::test] + async fn test_successful_operation_no_retry() { + let config = RetryConfig::new().with_max_retries(3); + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config_and_check( + &config, + "test_operation", + || { + let count = call_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + Ok::(42) + } + }, + |_| true, + ) + .await; + + assert_eq!(result.unwrap(), 42); + assert_eq!(call_count.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_retriable_error_with_eventual_success() { + let config = RetryConfig::new() + .with_max_retries(3) + .with_delay(Duration::from_millis(1)); // Very short delay for testing + + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config_and_check( + &config, + "test_operation", + || { + let count = call_count_clone.clone(); + async move { + let current_count = count.fetch_add(1, Ordering::SeqCst); + if current_count < 2 { + Err(TestError::new("retriable error", true)) + } else { + Ok(42) + } + } + }, + |error| error.is_retriable(), + ) + .await; + + assert_eq!(result.unwrap(), 42); + assert_eq!(call_count.load(Ordering::SeqCst), 3); // Failed twice, succeeded on third + } + + #[tokio::test] + async fn test_non_retriable_error_no_retry() { + let config = RetryConfig::new().with_max_retries(3); + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config_and_check( + &config, + "test_operation", + || { + let count = call_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + Err::(TestError::new("non-retriable error", false)) + } + }, + |error| error.is_retriable(), + ) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err().message, "non-retriable error"); + assert_eq!(call_count.load(Ordering::SeqCst), 1); // Only called once + } + + #[tokio::test] + async fn test_max_retries_exhausted() { + let config = RetryConfig::new() + .with_max_retries(2) + .with_delay(Duration::from_millis(1)); // Very short delay for testing + + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config_and_check( + &config, + "test_operation", + || { + let count = call_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + Err::(TestError::new("always fails", true)) + } + }, + |error| error.is_retriable(), + ) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err().message, "always fails"); + assert_eq!(call_count.load(Ordering::SeqCst), 3); // Initial attempt + 2 retries + } + + #[tokio::test] + async fn test_zero_retries() { + let config = RetryConfig::new().with_max_retries(0); + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config_and_check( + &config, + "test_operation", + || { + let count = call_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + Err::(TestError::new("error", true)) + } + }, + |error| error.is_retriable(), + ) + .await; + + assert!(result.is_err()); + assert_eq!(call_count.load(Ordering::SeqCst), 1); // Only initial attempt + } + + #[tokio::test] + async fn test_legacy_retry_function() { + let config = RetryConfig::new() + .with_max_retries(2) + .with_delay(Duration::from_millis(1)); + + let call_count = Arc::new(AtomicU32::new(0)); + let call_count_clone = call_count.clone(); + + let result = retry_with_config(&config, "test_operation", || { + let count = call_count_clone.clone(); + async move { + let current_count = count.fetch_add(1, Ordering::SeqCst); + if current_count < 1 { + Err(TestError::new("retriable error", true)) + } else { + Ok(42) + } + } + }) + .await; + + assert_eq!(result.unwrap(), 42); + assert_eq!(call_count.load(Ordering::SeqCst), 2); // Failed once, succeeded on second + } +} + +/// Execute a function with retry logic that includes retriability checking +pub async fn retry_with_config_and_check( + config: &RetryConfig, + operation_name: &str, + mut operation: F, + is_retriable: R, +) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Display + Clone, + R: Fn(&E) -> bool, +{ + let max_attempts = config.max_retries.saturating_add(1); // +1 for the initial attempt, with overflow protection + + for attempt in 0..max_attempts { + // Wait before retry (but not before the first attempt) + if attempt > 0 && !config.delay.is_zero() { + // TODO: Implement exponential backoff with jitter (same as above) + // - This should use the same logic as retry_with_config + // - Consider extracting to a shared helper function + sleep(config.delay).await; + } + + match operation().await { + Ok(result) => { + // Success! Log retry info if this wasn't the first attempt + if attempt > 0 { + eprintln!( + "{} succeeded on attempt {} after retries", + operation_name, + attempt + 1 + ); + } + return Ok(result); + } + Err(error) => { + let is_last_attempt = attempt >= max_attempts.saturating_sub(1); + let should_retry = !is_last_attempt && is_retriable(&error); + + if should_retry { + eprintln!( + "{} attempt {} failed: {}. Retrying in {:?}...", + operation_name, + attempt + 1, + error, + config.delay + ); + // Continue to next iteration + } else { + // Either we've exhausted retries or the error is not retriable + if is_last_attempt { + eprintln!( + "{} failed after {} attempts: {}", + operation_name, max_attempts, error + ); + } else { + eprintln!( + "{} failed with non-retriable error: {}", + operation_name, error + ); + } + return Err(error); + } + } + } + } + + // This should be unreachable, but included for completeness + unreachable!("Retry loop should have returned by now") +} diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs index e655212a9..ea824743e 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs @@ -63,6 +63,7 @@ async fn main() { role_name, role_instance, max_concurrent_uploads: None, // Use default + retry_config: None, // Use default retry configuration }; let geneva_client = GenevaClient::new(config) diff --git a/stress/src/geneva_exporter.rs b/stress/src/geneva_exporter.rs index 691f077ed..40b5cad3a 100644 --- a/stress/src/geneva_exporter.rs +++ b/stress/src/geneva_exporter.rs @@ -124,6 +124,7 @@ async fn init_client() -> Result<(GenevaClient, Option), Box Result<(GenevaClient, Option), Box Result<(GenevaClient, Option), Box