diff --git a/Cargo.toml b/Cargo.toml index ba5abd5264..0c5998f65a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ temp-env = "0.3.6" thiserror = { version = "2", default-features = false } tonic = { version = "0.13", default-features = false } tonic-build = "0.13" +tonic-types = "0.13" tokio = { version = "1", default-features = false } tokio-stream = "0.1" # Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros, diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index e1581a3f23..a7b582537a 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -3,6 +3,7 @@ ## vNext - Add HTTP compression support with `gzip-http` and `zstd-http` feature flags +- Add retry with exponential backoff and throttling support for HTTP and gRPC exporters ## 0.30.0 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index ff1d7f318d..ef1c6d0738 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true} prost = { workspace = true, optional = true } tonic = { workspace = true, optional = true } +tonic-types = { workspace = true, optional = true } tokio = { workspace = true, features = ["sync", "rt"], optional = true } reqwest = { workspace = true, optional = true } @@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"] default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"] # grpc using tonic -grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] +grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"] gzip-tonic = ["tonic/gzip"] zstd-tonic = ["tonic/zstd"] @@ -82,6 +83,11 @@ tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"] # http binary http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "http", "trace", "metrics"] + +# http with retry support. +# What should we do with this? We need the async_runtime. gRPC exporters already need it. +http-retry = ["opentelemetry_sdk/experimental_async_runtime", "opentelemetry_sdk/rt-tokio", "tokio"] + http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "opentelemetry-proto/with-serde", "http", "trace", "metrics"] reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest-blocking"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] diff --git a/opentelemetry-otlp/allowed-external-types.toml b/opentelemetry-otlp/allowed-external-types.toml index 7995873ee8..c40e327867 100644 --- a/opentelemetry-otlp/allowed-external-types.toml +++ b/opentelemetry-otlp/allowed-external-types.toml @@ -15,4 +15,7 @@ allowed_external_types = [ "tonic::transport::tls::Identity", "tonic::transport::channel::Channel", "tonic::service::interceptor::Interceptor", + + # For retries + "tonic::status::Status" ] diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 3a66266652..cc61a10474 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -4,9 +4,124 @@ use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; +#[cfg(feature = "http-retry")] +use std::sync::Arc; use std::time; +#[cfg(feature = "http-retry")] +use super::{classify_http_export_error, HttpExportError, HttpRetryData}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::runtime::Tokio; + impl LogExporter for OtlpHttpClient { + #[cfg(feature = "http-retry")] + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + // Build request body once before retry loop since LogBatch contains borrowed data + let (body, content_type, content_encoding) = self + .build_logs_export_body(batch) + .map_err(OTelSdkError::InternalFailure)?; + + let retry_data = Arc::new(HttpRetryData { + body, + headers: self.headers.clone(), + endpoint: self.collector_endpoint.to_string(), + }); + + retry_with_backoff( + Tokio, + policy, + classify_http_export_error, + "HttpLogsClient.Export", + || async { + // Get client + let client = self + .client + .lock() + .map_err(|e| HttpExportError { + status_code: 500, + retry_after: None, + message: format!("Mutex lock failed: {e}"), + })? + .as_ref() + .ok_or_else(|| HttpExportError { + status_code: 500, + retry_after: None, + message: "Exporter already shutdown".to_string(), + })? + .clone(); + + // Build HTTP request + let mut request_builder = http::Request::builder() + .method(Method::POST) + .uri(&retry_data.endpoint) + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header(CONTENT_ENCODING, encoding); + } + + let mut request = request_builder + .body(retry_data.body.clone().into()) + .map_err(|e| HttpExportError { + status_code: 400, + retry_after: None, + message: format!("Failed to build HTTP request: {e}"), + })?; + + for (k, v) in &retry_data.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } + + let request_uri = request.uri().to_string(); + otel_debug!(name: "HttpLogsClient.ExportStarted"); + + // Send request + let response = client.send_bytes(request).await.map_err(|e| { + HttpExportError { + status_code: 0, // Network error + retry_after: None, + message: format!("Network error: {e:?}"), + } + })?; + + let status_code = response.status().as_u16(); + let retry_after = response + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + if !response.status().is_success() { + return Err(HttpExportError { + status_code, + retry_after, + message: format!( + "HTTP export failed. Url: {}, Status: {}, Response: {:?}", + request_uri, + status_code, + response.body() + ), + }); + } + + otel_debug!(name: "HttpLogsClient.ExportSucceeded"); + Ok(()) + }, + ) + .await + .map_err(|e| OTelSdkError::InternalFailure(e.message)) + } + + #[cfg(not(feature = "http-retry"))] async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { let client = self .client diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 11a3041585..b022754486 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -8,7 +8,121 @@ use opentelemetry_sdk::metrics::data::ResourceMetrics; use super::OtlpHttpClient; +#[cfg(feature = "http-retry")] +use super::{classify_http_export_error, HttpExportError, HttpRetryData}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::runtime::Tokio; + impl MetricsClient for OtlpHttpClient { + #[cfg(feature = "http-retry")] + async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + // Build request body once before retry loop + let (body, content_type, content_encoding) = + self.build_metrics_export_body(metrics).ok_or_else(|| { + OTelSdkError::InternalFailure("Failed to serialize metrics".to_string()) + })?; + + let retry_data = Arc::new(HttpRetryData { + body, + headers: self.headers.clone(), + endpoint: self.collector_endpoint.to_string(), + }); + + retry_with_backoff( + Tokio, + policy, + classify_http_export_error, + "HttpMetricsClient.Export", + || async { + // Get client + let client = self + .client + .lock() + .map_err(|e| HttpExportError { + status_code: 500, + retry_after: None, + message: format!("Mutex lock failed: {e}"), + })? + .as_ref() + .ok_or_else(|| HttpExportError { + status_code: 500, + retry_after: None, + message: "Exporter already shutdown".to_string(), + })? + .clone(); + + // Build HTTP request + let mut request_builder = http::Request::builder() + .method(Method::POST) + .uri(&retry_data.endpoint) + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header("Content-Encoding", encoding); + } + + let mut request = request_builder + .body(retry_data.body.clone().into()) + .map_err(|e| HttpExportError { + status_code: 400, + retry_after: None, + message: format!("Failed to build HTTP request: {e}"), + })?; + + for (k, v) in &retry_data.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } + + let request_uri = request.uri().to_string(); + otel_debug!(name: "HttpMetricsClient.ExportStarted"); + + // Send request + let response = client.send_bytes(request).await.map_err(|e| { + HttpExportError { + status_code: 0, // Network error + retry_after: None, + message: format!("Network error: {e:?}"), + } + })?; + + let status_code = response.status().as_u16(); + let retry_after = response + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + if !response.status().is_success() { + return Err(HttpExportError { + status_code, + retry_after, + message: format!( + "HTTP export failed. Url: {}, Status: {}, Response: {:?}", + request_uri, + status_code, + response.body() + ), + }); + } + + otel_debug!(name: "HttpMetricsClient.ExportSucceeded"); + Ok(()) + }, + ) + .await + .map_err(|e| OTelSdkError::InternalFailure(e.message)) + } + + #[cfg(not(feature = "http-retry"))] async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { let client = self .client diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 89b87f5d88..3a7cec8a4e 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -22,6 +22,37 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::Duration; +#[cfg(feature = "http-retry")] +use crate::retry_classification::http::classify_http_error; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::retry::RetryErrorType; + +// Shared HTTP retry functionality +#[cfg(feature = "http-retry")] +/// HTTP-specific error wrapper for retry classification +#[derive(Debug)] +pub(crate) struct HttpExportError { + pub status_code: u16, + pub retry_after: Option, + pub message: String, +} + +#[cfg(feature = "http-retry")] +/// Classify HTTP export errors for retry decisions +pub(crate) fn classify_http_export_error(error: &HttpExportError) -> RetryErrorType { + classify_http_error(error.status_code, error.retry_after.as_deref()) +} + +#[cfg(feature = "http-retry")] +/// Shared HTTP request data for retry attempts - optimizes Arc usage by bundling all data +/// we need to pass into the retry handler +#[derive(Debug)] +pub(crate) struct HttpRetryData { + pub body: Vec, + pub headers: HashMap, + pub endpoint: String, +} + #[cfg(feature = "metrics")] mod metrics; @@ -388,7 +419,7 @@ impl OtlpHttpClient { logs: LogBatch<'_>, ) -> Result<(Vec, &'static str, Option<&'static str>), String> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; - let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; let (body, content_type) = match self.protocol { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 28bc7fc5fb..c55d4f4609 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -8,7 +8,121 @@ use opentelemetry_sdk::{ trace::{SpanData, SpanExporter}, }; +#[cfg(feature = "http-retry")] +use super::{classify_http_export_error, HttpExportError, HttpRetryData}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +#[cfg(feature = "http-retry")] +use opentelemetry_sdk::runtime::Tokio; + impl SpanExporter for OtlpHttpClient { + #[cfg(feature = "http-retry")] + async fn export(&self, batch: Vec) -> OTelSdkResult { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + // Build request body once before retry loop + let (body, content_type, content_encoding) = + self.build_trace_export_body(batch).map_err(|e| { + OTelSdkError::InternalFailure(format!("Failed to build request body: {e}")) + })?; + + let retry_data = Arc::new(HttpRetryData { + body, + headers: self.headers.clone(), + endpoint: self.collector_endpoint.to_string(), + }); + + retry_with_backoff( + Tokio, + policy, + classify_http_export_error, + "HttpTracesClient.Export", + || async { + // Get client + let client = self + .client + .lock() + .map_err(|e| HttpExportError { + status_code: 500, + retry_after: None, + message: format!("Mutex lock failed: {e}"), + })? + .as_ref() + .ok_or_else(|| HttpExportError { + status_code: 500, + retry_after: None, + message: "Exporter already shutdown".to_string(), + })? + .clone(); + + // Build HTTP request + let mut request_builder = http::Request::builder() + .method(Method::POST) + .uri(&retry_data.endpoint) + .header(CONTENT_TYPE, content_type); + + if let Some(encoding) = content_encoding { + request_builder = request_builder.header("Content-Encoding", encoding); + } + + let mut request = request_builder + .body(retry_data.body.clone().into()) + .map_err(|e| HttpExportError { + status_code: 400, + retry_after: None, + message: format!("Failed to build HTTP request: {e}"), + })?; + + for (k, v) in &retry_data.headers { + request.headers_mut().insert(k.clone(), v.clone()); + } + + let request_uri = request.uri().to_string(); + otel_debug!(name: "HttpTracesClient.ExportStarted"); + + // Send request + let response = client.send_bytes(request).await.map_err(|e| { + HttpExportError { + status_code: 0, // Network error + retry_after: None, + message: format!("Network error: {e:?}"), + } + })?; + + let status_code = response.status().as_u16(); + let retry_after = response + .headers() + .get("retry-after") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + if !response.status().is_success() { + return Err(HttpExportError { + status_code, + retry_after, + message: format!( + "HTTP export failed. Url: {}, Status: {}, Response: {:?}", + request_uri, + status_code, + response.body() + ), + }); + } + + otel_debug!(name: "HttpTracesClient.ExportSucceeded"); + Ok(()) + }, + ) + .await + .map_err(|e| OTelSdkError::InternalFailure(e.message)) + } + + #[cfg(not(feature = "http-retry"))] async fn export(&self, batch: Vec) -> OTelSdkResult { let client = match self .client diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 9f8b9d8a6d..fee1244686 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -5,6 +5,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; +use std::sync::Arc; use std::time; use tokio::sync::Mutex; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -13,6 +14,10 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; +use crate::retry_classification::grpc::classify_tonic_status; +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +use opentelemetry_sdk::runtime::Tokio; + pub(crate) struct TonicLogsClient { inner: Mutex>, #[allow(dead_code)] @@ -58,40 +63,65 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { - let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .call(Request::new(())) - .map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(OTelSdkError::AlreadyShutdown), + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); - - otel_debug!(name: "TonicLogsClient.ExportStarted"); - - let result = client - .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, - )) - .await; - - match result { - Ok(_) => { - otel_debug!(name: "TonicLogsClient.ExportSucceeded"); - Ok(()) - } - Err(e) => { - let error = format!("export error: {e:?}"); - otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error); - Err(OTelSdkError::InternalFailure(error)) - } + let batch = Arc::new(batch); + + match retry_with_backoff( + Tokio, + policy, + classify_tonic_status, + "TonicLogsClient.Export", + || async { + let batch_clone = Arc::clone(&batch); + + // Execute the export operation + let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| { + // Convert interceptor errors to tonic::Status for retry classification + tonic::Status::internal(format!("interceptor error: {e:?}")) + })? + .into_parts(); + (inner.client.clone(), m, e) + } + None => { + return Err(tonic::Status::failed_precondition( + "exporter already shutdown", + )) + } + }; + + let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource); + + otel_debug!(name: "TonicLogsClient.ExportStarted"); + + client + .export(Request::from_parts( + metadata, + extensions, + ExportLogsServiceRequest { resource_logs }, + )) + .await + .map(|_| { + otel_debug!(name: "TonicLogsClient.ExportSucceeded"); + }) + }, + ) + .await + { + Ok(_) => Ok(()), + Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!( + "export error: {tonic_status:?}" + ))), } } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 13813c7305..d6244b4bae 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -12,6 +12,10 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use super::BoxInterceptor; use crate::metric::MetricsClient; +use crate::retry_classification::grpc::classify_tonic_status; +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +use opentelemetry_sdk::runtime::Tokio; + pub(crate) struct TonicMetricsClient { inner: Mutex>, } @@ -53,48 +57,62 @@ impl TonicMetricsClient { impl MetricsClient for TonicMetricsClient { async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { - let (mut client, metadata, extensions) = self - .inner - .lock() - .map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))) - .and_then(|mut inner| match &mut *inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .call(Request::new(())) - .map_err(|e| { - OTelSdkError::InternalFailure(format!( - "unexpected status while exporting {e:?}" - )) - })? - .into_parts(); - Ok((inner.client.clone(), m, e)) - } - None => Err(OTelSdkError::InternalFailure( - "exporter is already shut down".into(), - )), - })?; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; - otel_debug!(name: "TonicMetricsClient.ExportStarted"); + match retry_with_backoff( + Tokio, + policy, + classify_tonic_status, + "TonicMetricsClient.Export", + || async { + // Execute the export operation + let (mut client, metadata, extensions) = self + .inner + .lock() + .map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}"))) + .and_then(|mut inner| match &mut *inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .call(Request::new(())) + .map_err(|e| { + tonic::Status::internal(format!( + "unexpected status while exporting {e:?}" + )) + })? + .into_parts(); + Ok((inner.client.clone(), m, e)) + } + None => Err(tonic::Status::failed_precondition( + "exporter is already shut down", + )), + })?; - let result = client - .export(Request::from_parts( - metadata, - extensions, - ExportMetricsServiceRequest::from(metrics), - )) - .await; + otel_debug!(name: "TonicMetricsClient.ExportStarted"); - match result { - Ok(_) => { - otel_debug!(name: "TonicMetricsClient.ExportSucceeded"); - Ok(()) - } - Err(e) => { - let error = format!("{e:?}"); - otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error); - Err(OTelSdkError::InternalFailure(error)) - } + client + .export(Request::from_parts( + metadata, + extensions, + ExportMetricsServiceRequest::from(metrics), + )) + .await + .map(|_| { + otel_debug!(name: "TonicMetricsClient.ExportSucceeded"); + }) + }, + ) + .await + { + Ok(_) => Ok(()), + Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!( + "export error: {tonic_status:?}" + ))), } } diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index a71a843e40..6986107602 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -564,9 +564,6 @@ mod tests { #[test] #[cfg(feature = "gzip-tonic")] fn test_with_gzip_compression() { - // metadata should merge with the current one with priority instead of just replacing it - let mut metadata = MetadataMap::new(); - metadata.insert("foo", "bar".parse().unwrap()); let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 4378c37a04..a5239b4602 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -1,4 +1,5 @@ use core::fmt; +use std::sync::Arc; use tokio::sync::Mutex; use opentelemetry::otel_debug; @@ -15,6 +16,10 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann use super::BoxInterceptor; +use crate::retry_classification::grpc::classify_tonic_status; +use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; +use opentelemetry_sdk::runtime::Tokio; + pub(crate) struct TonicTracesClient { inner: Option, #[allow(dead_code)] @@ -60,42 +65,68 @@ impl TonicTracesClient { impl SpanExporter for TonicTracesClient { async fn export(&self, batch: Vec) -> OTelSdkResult { - let (mut client, metadata, extensions) = match &self.inner { - Some(inner) => { - let (m, e, _) = inner - .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here - .call(Request::new(())) - .map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))? - .into_parts(); - (inner.client.clone(), m, e) - } - None => return Err(OTelSdkError::AlreadyShutdown), + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, }; - let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource); - - otel_debug!(name: "TonicTracesClient.ExportStarted"); - - let result = client - .export(Request::from_parts( - metadata, - extensions, - ExportTraceServiceRequest { resource_spans }, - )) - .await; - - match result { - Ok(_) => { - otel_debug!(name: "TonicTracesClient.ExportSucceeded"); - Ok(()) - } - Err(e) => { - let error = e.to_string(); - otel_debug!(name: "TonicTracesClient.ExportFailed", error = &error); - Err(OTelSdkError::InternalFailure(error)) - } + let batch = Arc::new(batch); + + match retry_with_backoff( + Tokio, + policy, + classify_tonic_status, + "TonicTracesClient.Export", + || async { + let batch_clone = Arc::clone(&batch); + + // Execute the export operation + let (mut client, metadata, extensions) = match &self.inner { + Some(inner) => { + let (m, e, _) = inner + .interceptor + .lock() + .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here + .call(Request::new(())) + .map_err(|e| { + // Convert interceptor errors to tonic::Status for retry classification + tonic::Status::internal(format!("interceptor error: {e:?}")) + })? + .into_parts(); + (inner.client.clone(), m, e) + } + None => { + return Err(tonic::Status::failed_precondition( + "exporter already shutdown", + )) + } + }; + + let resource_spans = + group_spans_by_resource_and_scope((*batch_clone).clone(), &self.resource); + + otel_debug!(name: "TonicTracesClient.ExportStarted"); + + client + .export(Request::from_parts( + metadata, + extensions, + ExportTraceServiceRequest { resource_spans }, + )) + .await + .map(|_| { + otel_debug!(name: "TonicTracesClient.ExportSucceeded"); + }) + }, + ) + .await + { + Ok(_) => Ok(()), + Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!( + "export error: {tonic_status:?}" + ))), } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index 3474b2b590..e8de66d273 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -366,6 +366,9 @@ mod metric; #[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))] mod span; +#[cfg(any(feature = "grpc-tonic", feature = "http-retry"))] +pub mod retry_classification; + pub use crate::exporter::Compression; pub use crate::exporter::ExportConfig; pub use crate::exporter::ExporterBuildError; diff --git a/opentelemetry-otlp/src/retry_classification.rs b/opentelemetry-otlp/src/retry_classification.rs new file mode 100644 index 0000000000..06a8d914ad --- /dev/null +++ b/opentelemetry-otlp/src/retry_classification.rs @@ -0,0 +1,481 @@ +//! Error classification for OTLP exporters with protocol-specific throttling support. +//! +//! This module provides error classification functions for HTTP and gRPC protocols, +//! supporting server-provided throttling hints like HTTP Retry-After headers and +//! gRPC RetryInfo metadata. + +use opentelemetry_sdk::retry::RetryErrorType; +use std::time::Duration; + +#[cfg(feature = "grpc-tonic")] +use tonic; + +#[cfg(feature = "grpc-tonic")] +use tonic_types::StatusExt; + +/// HTTP-specific error classification with Retry-After header support. +pub mod http { + use super::*; + + /// Classifies HTTP errors based on status code and headers. + /// + /// # Arguments + /// * `status_code` - HTTP status code + /// * `retry_after_header` - Value of the Retry-After header, if present + /// + /// # Retry-After Header Formats + /// * Seconds: "120" + /// * HTTP Date: "Fri, 31 Dec 1999 23:59:59 GMT" + pub fn classify_http_error( + status_code: u16, + retry_after_header: Option<&str>, + ) -> RetryErrorType { + match status_code { + // 429 Too Many Requests - check for Retry-After + 429 => { + if let Some(retry_after) = retry_after_header { + if let Some(duration) = parse_retry_after(retry_after) { + return RetryErrorType::Throttled(duration); + } + } + // Fallback to retryable if no valid Retry-After + RetryErrorType::Retryable + } + // 5xx Server errors - retryable + 500..=599 => RetryErrorType::Retryable, + // 4xx Client errors (except 429) - not retryable + 400..=499 => RetryErrorType::NonRetryable, + // Other codes - retryable (network issues, etc.) + _ => RetryErrorType::Retryable, + } + } + + /// Parses the Retry-After header value. + /// + /// Supports both formats: + /// - Delay seconds: "120" + /// - HTTP date: "Fri, 31 Dec 1999 23:59:59 GMT" + /// + /// Returns None if parsing fails or delay is unreasonable. + fn parse_retry_after(retry_after: &str) -> Option { + // Try parsing as seconds first + if let Ok(seconds) = retry_after.trim().parse::() { + // Cap at 10 minutes. TODO - what's sensible here? + let capped_seconds = seconds.min(600); + return Some(Duration::from_secs(capped_seconds)); + } + + // Try parsing as HTTP date + if let Ok(delay_seconds) = parse_http_date_to_delay(retry_after) { + // Cap at 10 minutes. TODO - what's sensible here? + let capped_seconds = delay_seconds.min(600); + return Some(Duration::from_secs(capped_seconds)); + } + + None + } + + /// Parses HTTP date format and returns delay in seconds from now. + /// + /// This is a simplified parser for the most common HTTP date format. + /// TODO - should we use a library here? + fn parse_http_date_to_delay(date_str: &str) -> Result { + // For now, return error - would need proper HTTP date parsing + // This could be implemented with chrono or similar + let _ = date_str; + Err(()) + } +} + +/// gRPC-specific error classification with RetryInfo support. +#[cfg(feature = "grpc-tonic")] +pub mod grpc { + use super::*; + + /// Classifies a tonic::Status error + #[cfg(feature = "grpc-tonic")] + pub fn classify_tonic_status(status: &tonic::Status) -> RetryErrorType { + // Use tonic-types to extract RetryInfo - this is the proper way! + let retry_info_seconds = status + .get_details_retry_info() + .and_then(|retry_info| retry_info.retry_delay) + .map(|duration| duration.as_secs()); + + classify_grpc_error(status.code(), retry_info_seconds) + } + + /// Classifies gRPC errors based on status code and metadata. + /// + /// Implements the OpenTelemetry OTLP specification for error handling: + /// https://opentelemetry.io/docs/specs/otlp/ + /// https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + /// + /// # Arguments + /// * `grpc_code` - gRPC status code as tonic::Code enum + /// * `retry_info_seconds` - Parsed retry delay from RetryInfo metadata, if present + fn classify_grpc_error( + grpc_code: tonic::Code, + retry_info_seconds: Option, + ) -> RetryErrorType { + match grpc_code { + // RESOURCE_EXHAUSTED: Special case per OTLP spec + // Retryable only if server provides RetryInfo indicating recovery is possible + tonic::Code::ResourceExhausted => { + if let Some(seconds) = retry_info_seconds { + // Server signals recovery is possible - use throttled retry + let capped_seconds = seconds.min(600); // Cap at 10 minutes. TODO - what's sensible here? + return RetryErrorType::Throttled(std::time::Duration::from_secs( + capped_seconds, + )); + } + // No RetryInfo - treat as non-retryable per OTLP spec + RetryErrorType::NonRetryable + } + + // Retryable errors per OTLP specification + tonic::Code::Cancelled + | tonic::Code::DeadlineExceeded + | tonic::Code::Aborted + | tonic::Code::OutOfRange + | tonic::Code::Unavailable + | tonic::Code::DataLoss => RetryErrorType::Retryable, + + // Non-retryable errors per OTLP specification + tonic::Code::Unknown + | tonic::Code::InvalidArgument + | tonic::Code::NotFound + | tonic::Code::AlreadyExists + | tonic::Code::PermissionDenied + | tonic::Code::FailedPrecondition + | tonic::Code::Unimplemented + | tonic::Code::Internal + | tonic::Code::Unauthenticated => RetryErrorType::NonRetryable, + + // OK should never reach here in error scenarios, but handle gracefully + tonic::Code::Ok => RetryErrorType::NonRetryable, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Tests for HTTP error classification + mod http_tests { + use super::*; + use crate::retry_classification::http::*; + + #[test] + fn test_http_429_with_retry_after_seconds() { + let result = classify_http_error(429, Some("30")); + assert_eq!(result, RetryErrorType::Throttled(Duration::from_secs(30))); + } + + #[test] + fn test_http_429_with_large_retry_after_capped() { + let result = classify_http_error(429, Some("900")); // 15 minutes + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(600)) + ); // Capped at 10 minutes + } + + #[test] + fn test_http_429_with_invalid_retry_after() { + let result = classify_http_error(429, Some("invalid")); + assert_eq!(result, RetryErrorType::Retryable); // Fallback + } + + #[test] + fn test_http_429_without_retry_after() { + let result = classify_http_error(429, None); + assert_eq!(result, RetryErrorType::Retryable); // Fallback + } + + #[test] + fn test_http_5xx_errors() { + assert_eq!(classify_http_error(500, None), RetryErrorType::Retryable); + assert_eq!(classify_http_error(502, None), RetryErrorType::Retryable); + assert_eq!(classify_http_error(503, None), RetryErrorType::Retryable); + assert_eq!(classify_http_error(599, None), RetryErrorType::Retryable); + } + + #[test] + fn test_http_4xx_errors() { + assert_eq!(classify_http_error(400, None), RetryErrorType::NonRetryable); + assert_eq!(classify_http_error(401, None), RetryErrorType::NonRetryable); + assert_eq!(classify_http_error(403, None), RetryErrorType::NonRetryable); + assert_eq!(classify_http_error(404, None), RetryErrorType::NonRetryable); + assert_eq!(classify_http_error(499, None), RetryErrorType::NonRetryable); + } + + #[test] + fn test_http_other_errors() { + assert_eq!(classify_http_error(100, None), RetryErrorType::Retryable); + assert_eq!(classify_http_error(200, None), RetryErrorType::Retryable); + assert_eq!(classify_http_error(300, None), RetryErrorType::Retryable); + } + } + + // Tests for gRPC error classification using public interface + #[cfg(feature = "grpc-tonic")] + mod grpc_tests { + use crate::retry_classification::grpc::classify_tonic_status; + use opentelemetry_sdk::retry::RetryErrorType; + use tonic_types::{ErrorDetails, StatusExt}; + + #[test] + fn test_grpc_resource_exhausted_with_retry_info() { + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_secs(45))); + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + let result = classify_tonic_status(&status); + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(45)) + ); + } + + #[test] + fn test_grpc_resource_exhausted_with_large_retry_info_capped() { + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_secs(900))); // 15 minutes + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + let result = classify_tonic_status(&status); + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(600)) + ); // Capped at 10 minutes + } + + #[test] + fn test_grpc_resource_exhausted_without_retry_info() { + let status = tonic::Status::new(tonic::Code::ResourceExhausted, "rate limited"); + let result = classify_tonic_status(&status); + // Per OTLP spec: RESOURCE_EXHAUSTED without RetryInfo is non-retryable + assert_eq!(result, RetryErrorType::NonRetryable); + } + + #[test] + fn test_grpc_retryable_errors() { + // Test all retryable errors per OTLP specification + let cancelled = tonic::Status::new(tonic::Code::Cancelled, "cancelled"); + assert_eq!(classify_tonic_status(&cancelled), RetryErrorType::Retryable); + + let deadline_exceeded = + tonic::Status::new(tonic::Code::DeadlineExceeded, "deadline exceeded"); + assert_eq!( + classify_tonic_status(&deadline_exceeded), + RetryErrorType::Retryable + ); + + let aborted = tonic::Status::new(tonic::Code::Aborted, "aborted"); + assert_eq!(classify_tonic_status(&aborted), RetryErrorType::Retryable); + + let out_of_range = tonic::Status::new(tonic::Code::OutOfRange, "out of range"); + assert_eq!( + classify_tonic_status(&out_of_range), + RetryErrorType::Retryable + ); + + let unavailable = tonic::Status::new(tonic::Code::Unavailable, "unavailable"); + assert_eq!( + classify_tonic_status(&unavailable), + RetryErrorType::Retryable + ); + + let data_loss = tonic::Status::new(tonic::Code::DataLoss, "data loss"); + assert_eq!(classify_tonic_status(&data_loss), RetryErrorType::Retryable); + } + + #[test] + fn test_grpc_non_retryable_errors() { + // Test all non-retryable errors per OTLP specification + let unknown = tonic::Status::new(tonic::Code::Unknown, "unknown"); + assert_eq!( + classify_tonic_status(&unknown), + RetryErrorType::NonRetryable + ); + + let invalid_argument = + tonic::Status::new(tonic::Code::InvalidArgument, "invalid argument"); + assert_eq!( + classify_tonic_status(&invalid_argument), + RetryErrorType::NonRetryable + ); + + let not_found = tonic::Status::new(tonic::Code::NotFound, "not found"); + assert_eq!( + classify_tonic_status(¬_found), + RetryErrorType::NonRetryable + ); + + let already_exists = tonic::Status::new(tonic::Code::AlreadyExists, "already exists"); + assert_eq!( + classify_tonic_status(&already_exists), + RetryErrorType::NonRetryable + ); + + let permission_denied = + tonic::Status::new(tonic::Code::PermissionDenied, "permission denied"); + assert_eq!( + classify_tonic_status(&permission_denied), + RetryErrorType::NonRetryable + ); + + let failed_precondition = + tonic::Status::new(tonic::Code::FailedPrecondition, "failed precondition"); + assert_eq!( + classify_tonic_status(&failed_precondition), + RetryErrorType::NonRetryable + ); + + let unimplemented = tonic::Status::new(tonic::Code::Unimplemented, "unimplemented"); + assert_eq!( + classify_tonic_status(&unimplemented), + RetryErrorType::NonRetryable + ); + + let internal = tonic::Status::new(tonic::Code::Internal, "internal error"); + assert_eq!( + classify_tonic_status(&internal), + RetryErrorType::NonRetryable + ); + + let unauthenticated = + tonic::Status::new(tonic::Code::Unauthenticated, "unauthenticated"); + assert_eq!( + classify_tonic_status(&unauthenticated), + RetryErrorType::NonRetryable + ); + } + + #[test] + fn test_grpc_ok_code_handled() { + // OK status should be handled gracefully (though unlikely in error scenarios) + let ok = tonic::Status::new(tonic::Code::Ok, "success"); + assert_eq!(classify_tonic_status(&ok), RetryErrorType::NonRetryable); + } + + // Tests for tonic-types RetryInfo integration + #[cfg(feature = "grpc-tonic")] + mod retry_info_tests { + use super::*; + use crate::retry_classification::grpc::classify_tonic_status; + use tonic_types::{ErrorDetails, StatusExt}; + + #[test] + fn test_classify_status_with_retry_info() { + // Create a tonic::Status with RetryInfo using proper StatusExt API + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_secs(30))); + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + + // Test classification + let result = classify_tonic_status(&status); + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(30)) + ); + } + + #[test] + fn test_classify_status_with_fractional_retry_info() { + // Create a tonic::Status with fractional seconds RetryInfo + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_millis(5500))); // 5.5 seconds + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + + // Should use exact duration (5.5s = 5s) + let result = classify_tonic_status(&status); + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(5)) + ); + } + + #[test] + fn test_classify_status_without_retry_info() { + // Status with resource_exhausted but no RetryInfo + let status = tonic::Status::new(tonic::Code::ResourceExhausted, "rate limited"); + + // Per OTLP spec: should be non-retryable without RetryInfo + let result = classify_tonic_status(&status); + assert_eq!(result, RetryErrorType::NonRetryable); + } + + #[test] + fn test_classify_status_non_retryable_error() { + // Status with non-retryable error code + let status = tonic::Status::new(tonic::Code::InvalidArgument, "bad request"); + + let result = classify_tonic_status(&status); + assert_eq!(result, RetryErrorType::NonRetryable); + } + + #[test] + fn test_classify_status_retryable_error() { + // Status with retryable error code + let status = tonic::Status::new(tonic::Code::Unavailable, "service unavailable"); + + let result = classify_tonic_status(&status); + assert_eq!(result, RetryErrorType::Retryable); + } + + #[test] + fn test_classify_status_large_retry_delay() { + // Test with large retry delay - should be capped at 10 minutes + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_secs(3600))); // 1 hour + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + + let result = classify_tonic_status(&status); + // Should be capped at 10 minutes (600 seconds) + assert_eq!( + result, + RetryErrorType::Throttled(std::time::Duration::from_secs(600)) + ); + } + + #[test] + fn test_status_ext_get_details() { + // Test that StatusExt works correctly + let error_details = + ErrorDetails::with_retry_info(Some(std::time::Duration::from_secs(45))); + let status = tonic::Status::with_error_details( + tonic::Code::ResourceExhausted, + "rate limited", + error_details, + ); + + // Direct extraction should work + let extracted = status.get_details_retry_info(); + assert!(extracted.is_some()); + + let retry_delay = extracted.unwrap().retry_delay; + assert_eq!(retry_delay, Some(std::time::Duration::from_secs(45))); + } + } + } +} diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index 50570d888a..9037dec865 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -13,7 +13,7 @@ //! Only a single test suite can run at once, as each container has statically mapped ports, but //! this works nicely with the way cargo executes the suite. //! -//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests +//! To skip integration tests with cargo, you can run `cargo test --lib`, which will run unit tests //! only. //! #![cfg(unix)] diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 65d0598216..7d68decf82 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -165,8 +165,8 @@ pub mod tonic { } } - pub fn group_logs_by_resource_and_scope( - logs: LogBatch<'_>, + pub fn group_logs_by_resource_and_scope<'a>( + logs: &'a LogBatch<'a>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -271,7 +271,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -291,7 +291,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 8f8e5a8653..1143e333f8 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -167,3 +167,6 @@ impl From> for InMemoryExporterError { InMemoryExporterError::InternalFailure(format!("Mutex poison error: {err}")) } } + +/// Retry logic for exporting telemetry data. +pub mod retry; diff --git a/opentelemetry-sdk/src/retry.rs b/opentelemetry-sdk/src/retry.rs new file mode 100644 index 0000000000..48f4cf49a7 --- /dev/null +++ b/opentelemetry-sdk/src/retry.rs @@ -0,0 +1,461 @@ +//! This module provides functionality for retrying operations with exponential backoff and jitter. +//! +//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum +//! number of retries, initial delay, maximum delay, and jitter. +//! +//! The `retry_with_backoff` function retries the given operation according to the +//! specified retry policy, using exponential backoff and jitter to determine the delay between +//! retries. The function uses error classification to determine retry behavior and can honor +//! server-provided throttling hints. + +#[cfg(feature = "experimental_async_runtime")] +use opentelemetry::otel_warn; +#[cfg(feature = "experimental_async_runtime")] +use std::future::Future; +use std::time::Duration; +#[cfg(feature = "experimental_async_runtime")] +use std::time::SystemTime; + +#[cfg(feature = "experimental_async_runtime")] +use crate::runtime::Runtime; + +/// Classification of errors for retry purposes. +#[derive(Debug, Clone, PartialEq)] +pub enum RetryErrorType { + /// Error is not retryable (e.g., authentication failure, bad request). + NonRetryable, + /// Error is retryable with exponential backoff (e.g., server error, network timeout). + Retryable, + /// Error indicates throttling - wait for the specified duration before retrying. + /// This overrides exponential backoff timing. + Throttled(Duration), +} + +/// Configuration for retry policy. +#[derive(Debug)] +pub struct RetryPolicy { + /// Maximum number of retry attempts. + pub max_retries: usize, + /// Initial delay in milliseconds before the first retry. + pub initial_delay_ms: u64, + /// Maximum delay in milliseconds between retries. + pub max_delay_ms: u64, + /// Maximum jitter in milliseconds to add to the delay. + pub jitter_ms: u64, +} + +/// A runtime stub for when experimental_async_runtime is not enabled. +/// This allows retry policy to be configured but no actual retries occur. +#[cfg(not(feature = "experimental_async_runtime"))] +#[derive(Debug, Clone, Default)] +pub struct NoOpRuntime; + +#[cfg(not(feature = "experimental_async_runtime"))] +impl NoOpRuntime { + /// Creates a new no-op runtime. + pub fn new() -> Self { + Self + } +} + +// Generates a random jitter value up to max_jitter +#[cfg(feature = "experimental_async_runtime")] +fn generate_jitter(max_jitter: u64) -> u64 { + let now = SystemTime::now(); + let nanos = now + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_nanos(); + nanos as u64 % (max_jitter + 1) +} + +/// Retries the given operation with exponential backoff, jitter, and error classification. +/// +/// This function provides sophisticated retry behavior by classifying errors +/// and honoring server-provided throttling hints (e.g., HTTP Retry-After, gRPC RetryInfo). +/// +/// # Arguments +/// +/// * `runtime` - The async runtime to use for delays. +/// * `policy` - The retry policy configuration. +/// * `error_classifier` - Function to classify errors for retry decisions. +/// * `operation_name` - The name of the operation being retried. +/// * `operation` - The operation to be retried. +/// +/// # Returns +/// +/// A `Result` containing the operation's result or an error if max retries are reached +/// or a non-retryable error occurs. +#[cfg(feature = "experimental_async_runtime")] +pub async fn retry_with_backoff( + runtime: R, + policy: RetryPolicy, + error_classifier: C, + operation_name: &str, + mut operation: F, +) -> Result +where + R: Runtime, + F: FnMut() -> Fut, + E: std::fmt::Debug, + Fut: Future>, + C: Fn(&E) -> RetryErrorType, +{ + let mut attempt = 0; + let mut delay = policy.initial_delay_ms; + + loop { + match operation().await { + Ok(result) => return Ok(result), // Return the result if the operation succeeds + Err(err) => { + // Classify the error + let error_type = error_classifier(&err); + + match error_type { + RetryErrorType::NonRetryable => { + otel_warn!(name: "OtlpRetry", message = format!("Operation {:?} failed with non-retryable error: {:?}", operation_name, err)); + return Err(err); + } + RetryErrorType::Retryable if attempt < policy.max_retries => { + attempt += 1; + // Use exponential backoff with jitter + otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to retryable error: {:?}", operation_name, err)); + let jitter = generate_jitter(policy.jitter_ms); + let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); + runtime + .delay(Duration::from_millis(delay_with_jitter)) + .await; + delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff + } + RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => { + attempt += 1; + // Use server-specified delay (overrides exponential backoff) + otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} after server-specified throttling delay: {:?}", operation_name, server_delay)); + runtime.delay(server_delay).await; + // Don't update exponential backoff delay for next attempt since server provided specific timing + } + _ => { + // Max retries reached + otel_warn!(name: "OtlpRetry", message = format!("Operation {:?} failed after {} attempts: {:?}", operation_name, attempt, err)); + return Err(err); + } + } + } + } + } +} + +/// No-op retry function for when experimental_async_runtime is not enabled. +/// This function will execute the operation exactly once without any retries. +#[cfg(not(feature = "experimental_async_runtime"))] +pub async fn retry_with_backoff( + _runtime: R, + _policy: RetryPolicy, + _error_classifier: C, + _operation_name: &str, + mut operation: F, +) -> Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + // Without experimental_async_runtime, we just execute once without retries + operation().await +} + +#[cfg(all(test, feature = "experimental_async_runtime", feature = "rt-tokio"))] +mod tests { + use super::*; + use crate::runtime::Tokio; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + use tokio::time::timeout; + + // Test to ensure generate_jitter returns a value within the expected range + #[tokio::test] + async fn test_generate_jitter() { + let max_jitter = 100; + let jitter = generate_jitter(max_jitter); + assert!(jitter <= max_jitter); + } + + // Test to ensure retry_with_exponential_backoff succeeds on the first attempt + #[tokio::test] + async fn test_retry_with_exponential_backoff_success() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = retry_with_backoff( + runtime, + policy, + |_: &()| RetryErrorType::Retryable, + "test_operation", + || Box::pin(async { Ok::<_, ()>("success") }), + ) + .await; + + assert_eq!(result, Ok("success")); + } + + // Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds + #[tokio::test] + async fn test_retry_with_exponential_backoff_retries() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_backoff( + runtime, + policy, + |_: &&str| RetryErrorType::Retryable, + "test_operation", + || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 2 { + Err::<&str, &str>("error") // Fail the first two attempts + } else { + Ok::<&str, &str>("success") // Succeed on the third attempt + } + }) + }, + ) + .await; + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts + } + + // Test to ensure retry_with_exponential_backoff fails after max retries + #[tokio::test] + async fn test_retry_with_exponential_backoff_failure() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_backoff( + runtime, + policy, + |_: &&str| RetryErrorType::Retryable, + "test_operation", + || { + attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Err::<(), _>("error") }) // Always fail + }, + ) + .await; + + assert_eq!(result, Err("error")); + assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries) + } + + // Test to ensure retry_with_exponential_backoff respects the timeout + #[tokio::test] + async fn test_retry_with_exponential_backoff_timeout() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 12, // Increase the number of retries + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = timeout( + Duration::from_secs(1), + retry_with_backoff( + runtime, + policy, + |_: &&str| RetryErrorType::Retryable, + "test_operation", + || { + Box::pin(async { Err::<(), _>("error") }) // Always fail + }, + ), + ) + .await; + + assert!(result.is_err()); // Ensure the operation times out + } + + // Tests for error classification (Phase 1) + #[test] + fn test_retry_error_type_equality() { + assert_eq!(RetryErrorType::NonRetryable, RetryErrorType::NonRetryable); + assert_eq!(RetryErrorType::Retryable, RetryErrorType::Retryable); + assert_eq!( + RetryErrorType::Throttled(Duration::from_secs(30)), + RetryErrorType::Throttled(Duration::from_secs(30)) + ); + assert_ne!(RetryErrorType::Retryable, RetryErrorType::NonRetryable); + } + + // Tests for enhanced retry function (Phase 3) + #[tokio::test] + async fn test_retry_with_throttling_non_retryable_error() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + // Classifier that returns non-retryable + let classifier = |_: &()| RetryErrorType::NonRetryable; + + let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || { + attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Err::<(), _>(()) }) // Always fail + }) + .await; + + assert!(result.is_err()); + assert_eq!(attempts.load(Ordering::SeqCst), 1); // Should only try once + } + + #[tokio::test] + async fn test_retry_with_throttling_retryable_error() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 2, + initial_delay_ms: 10, // Short delay for test + max_delay_ms: 100, + jitter_ms: 5, + }; + + let attempts = AtomicUsize::new(0); + + // Classifier that returns retryable + let classifier = |_: &()| RetryErrorType::Retryable; + + let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 1 { + Err::<&str, ()>(()) // Fail first attempt + } else { + Ok("success") // Succeed on retry + } + }) + }) + .await; + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 2); // Should try twice + } + + #[tokio::test] + async fn test_retry_with_throttling_throttled_error() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 2, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + // Classifier that returns throttled with short delay + let classifier = |_: &()| RetryErrorType::Throttled(Duration::from_millis(10)); + + let start_time = std::time::Instant::now(); + + let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 1 { + Err::<&str, ()>(()) // Fail first attempt (will be throttled) + } else { + Ok("success") // Succeed on retry + } + }) + }) + .await; + + let elapsed = start_time.elapsed(); + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 2); // Should try twice + assert!(elapsed >= Duration::from_millis(10)); // Should have waited for throttle delay + } + + #[tokio::test] + async fn test_retry_with_throttling_max_attempts_exceeded() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 1, // Only 1 retry + initial_delay_ms: 10, + max_delay_ms: 100, + jitter_ms: 5, + }; + + let attempts = AtomicUsize::new(0); + + // Classifier that returns retryable + let classifier = |_: &()| RetryErrorType::Retryable; + + let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || { + attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Err::<(), _>(()) }) // Always fail + }) + .await; + + assert!(result.is_err()); + assert_eq!(attempts.load(Ordering::SeqCst), 2); // Initial attempt + 1 retry + } + + #[tokio::test] + async fn test_retry_with_throttling_mixed_error_types() { + let runtime = Tokio; + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 10, + max_delay_ms: 100, + jitter_ms: 5, + }; + + let attempts = AtomicUsize::new(0); + + // Classifier that returns different types based on attempt number + let classifier = |err: &usize| match *err { + 0 => RetryErrorType::Retryable, + 1 => RetryErrorType::Throttled(Duration::from_millis(10)), + _ => RetryErrorType::Retryable, + }; + + let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 2 { + Err(attempt) // Return attempt number as error + } else { + Ok("success") // Succeed on third attempt + } + }) + }) + .await; + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 3); // Should try three times + } +}