Skip to content

feat: support backoff/retry in OTLP #3126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ temp-env = "0.3.6"
thiserror = { version = "2", default-features = false }
tonic = { version = "0.13", default-features = false }
tonic-build = "0.13"
tonic-types = "0.13"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1"
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## vNext

- Add HTTP compression support with `gzip-http` and `zstd-http` feature flags
- Add retry with exponential backoff and throttling support for HTTP and gRPC exporters

## 0.30.0

Expand Down
8 changes: 7 additions & 1 deletion opentelemetry-otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true}

prost = { workspace = true, optional = true }
tonic = { workspace = true, optional = true }
tonic-types = { workspace = true, optional = true }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for gRPC error type

tokio = { workspace = true, features = ["sync", "rt"], optional = true }

reqwest = { workspace = true, optional = true }
Expand Down Expand Up @@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"]
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]

# grpc using tonic
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"]
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
gzip-tonic = ["tonic/gzip"]
zstd-tonic = ["tonic/zstd"]

Expand All @@ -82,6 +83,11 @@ tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"]

# http binary
http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "http", "trace", "metrics"]

# http with retry support.
# What should we do with this? We need the async_runtime. gRPC exporters already need it.
http-retry = ["opentelemetry_sdk/experimental_async_runtime", "opentelemetry_sdk/rt-tokio", "tokio"]

http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "opentelemetry-proto/with-serde", "http", "trace", "metrics"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest-blocking"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-otlp/allowed-external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ allowed_external_types = [
"tonic::transport::tls::Identity",
"tonic::transport::channel::Channel",
"tonic::service::interceptor::Interceptor",

# For retries
"tonic::status::Status"
]
115 changes: 115 additions & 0 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,124 @@ use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
#[cfg(feature = "http-retry")]
use std::sync::Arc;
use std::time;

#[cfg(feature = "http-retry")]
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
#[cfg(feature = "http-retry")]
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
#[cfg(feature = "http-retry")]
use opentelemetry_sdk::runtime::Tokio;

impl LogExporter for OtlpHttpClient {
#[cfg(feature = "http-retry")]
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let policy = RetryPolicy {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create a default retry policy and share it between all exporters?

max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

// Build request body once before retry loop since LogBatch contains borrowed data
let (body, content_type, content_encoding) = self
.build_logs_export_body(batch)
.map_err(OTelSdkError::InternalFailure)?;

let retry_data = Arc::new(HttpRetryData {
body,
headers: self.headers.clone(),
endpoint: self.collector_endpoint.to_string(),
});

retry_with_backoff(
Tokio,
policy,
classify_http_export_error,
"HttpLogsClient.Export",
|| async {
// Get client
let client = self
.client
.lock()
.map_err(|e| HttpExportError {
status_code: 500,
retry_after: None,
message: format!("Mutex lock failed: {e}"),
})?
.as_ref()
.ok_or_else(|| HttpExportError {
status_code: 500,
retry_after: None,
message: "Exporter already shutdown".to_string(),
})?
.clone();

// Build HTTP request
let mut request_builder = http::Request::builder()
.method(Method::POST)
.uri(&retry_data.endpoint)
.header(CONTENT_TYPE, content_type);

if let Some(encoding) = content_encoding {
request_builder = request_builder.header(CONTENT_ENCODING, encoding);
}

let mut request = request_builder
.body(retry_data.body.clone().into())
.map_err(|e| HttpExportError {
status_code: 400,
retry_after: None,
message: format!("Failed to build HTTP request: {e}"),
})?;

for (k, v) in &retry_data.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.ExportStarted");

// Send request
let response = client.send_bytes(request).await.map_err(|e| {
HttpExportError {
status_code: 0, // Network error
retry_after: None,
message: format!("Network error: {e:?}"),
}
})?;

let status_code = response.status().as_u16();
let retry_after = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());

if !response.status().is_success() {
return Err(HttpExportError {
status_code,
retry_after,
message: format!(
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
request_uri,
status_code,
response.body()
),
});
}

otel_debug!(name: "HttpLogsClient.ExportSucceeded");
Ok(())
},
)
.await
.map_err(|e| OTelSdkError::InternalFailure(e.message))
}

#[cfg(not(feature = "http-retry"))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a massive duplication of code.

If we decide that http export always has retry behaviour, which means we include the unstable runtime feature, then we can remove all of this.

Alternatively we can provide export once, with all the extra pomp and fanfare to support retry, and then just use the stub "don't actually retry" impl. There would be some slight runtime overhead to this, but the codebase would be much simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment above applies to the other HTTP exporters also.

async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let client = self
.client
Expand Down
114 changes: 114 additions & 0 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,121 @@ use opentelemetry_sdk::metrics::data::ResourceMetrics;

use super::OtlpHttpClient;

#[cfg(feature = "http-retry")]
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
#[cfg(feature = "http-retry")]
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
#[cfg(feature = "http-retry")]
use opentelemetry_sdk::runtime::Tokio;

impl MetricsClient for OtlpHttpClient {
#[cfg(feature = "http-retry")]
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

// Build request body once before retry loop
let (body, content_type, content_encoding) =
self.build_metrics_export_body(metrics).ok_or_else(|| {
OTelSdkError::InternalFailure("Failed to serialize metrics".to_string())
})?;

let retry_data = Arc::new(HttpRetryData {
body,
headers: self.headers.clone(),
endpoint: self.collector_endpoint.to_string(),
});

retry_with_backoff(
Tokio,
policy,
classify_http_export_error,
"HttpMetricsClient.Export",
|| async {
// Get client
let client = self
.client
.lock()
.map_err(|e| HttpExportError {
status_code: 500,
retry_after: None,
message: format!("Mutex lock failed: {e}"),
})?
.as_ref()
.ok_or_else(|| HttpExportError {
status_code: 500,
retry_after: None,
message: "Exporter already shutdown".to_string(),
})?
.clone();

// Build HTTP request
let mut request_builder = http::Request::builder()
.method(Method::POST)
.uri(&retry_data.endpoint)
.header(CONTENT_TYPE, content_type);

if let Some(encoding) = content_encoding {
request_builder = request_builder.header("Content-Encoding", encoding);
}

let mut request = request_builder
.body(retry_data.body.clone().into())
.map_err(|e| HttpExportError {
status_code: 400,
retry_after: None,
message: format!("Failed to build HTTP request: {e}"),
})?;

for (k, v) in &retry_data.headers {
request.headers_mut().insert(k.clone(), v.clone());
}

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpMetricsClient.ExportStarted");

// Send request
let response = client.send_bytes(request).await.map_err(|e| {
HttpExportError {
status_code: 0, // Network error
retry_after: None,
message: format!("Network error: {e:?}"),
}
})?;

let status_code = response.status().as_u16();
let retry_after = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());

if !response.status().is_success() {
return Err(HttpExportError {
status_code,
retry_after,
message: format!(
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
request_uri,
status_code,
response.body()
),
});
}

otel_debug!(name: "HttpMetricsClient.ExportSucceeded");
Ok(())
},
)
.await
.map_err(|e| OTelSdkError::InternalFailure(e.message))
}

#[cfg(not(feature = "http-retry"))]
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
let client = self
.client
Expand Down
33 changes: 32 additions & 1 deletion opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,37 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[cfg(feature = "http-retry")]
use crate::retry_classification::http::classify_http_error;
#[cfg(feature = "http-retry")]
use opentelemetry_sdk::retry::RetryErrorType;

// Shared HTTP retry functionality
#[cfg(feature = "http-retry")]
/// HTTP-specific error wrapper for retry classification
#[derive(Debug)]
pub(crate) struct HttpExportError {
pub status_code: u16,
pub retry_after: Option<String>,
pub message: String,
}

#[cfg(feature = "http-retry")]
/// Classify HTTP export errors for retry decisions
pub(crate) fn classify_http_export_error(error: &HttpExportError) -> RetryErrorType {
classify_http_error(error.status_code, error.retry_after.as_deref())
}

#[cfg(feature = "http-retry")]
/// Shared HTTP request data for retry attempts - optimizes Arc usage by bundling all data
/// we need to pass into the retry handler
#[derive(Debug)]
pub(crate) struct HttpRetryData {
pub body: Vec<u8>,
pub headers: HashMap<HeaderName, HeaderValue>,
pub endpoint: String,
}

#[cfg(feature = "metrics")]
mod metrics;

Expand Down Expand Up @@ -388,7 +419,7 @@ impl OtlpHttpClient {
logs: LogBatch<'_>,
) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource);
let req = ExportLogsServiceRequest { resource_logs };

let (body, content_type) = match self.protocol {
Expand Down
Loading
Loading