Skip to content

Commit fa1e093

Browse files
committed
Implement retry for http trace
1 parent 75f0d71 commit fa1e093

File tree

4 files changed

+142
-5
lines changed

4 files changed

+142
-5
lines changed

opentelemetry-otlp/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ tls-webpki-roots = ["tls", "tonic/tls-webpki-roots"]
8383

8484
# http binary
8585
http-proto = ["prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "http", "trace", "metrics"]
86+
87+
# http with retry support.
88+
# What should we do with this? We need the async_runtime. gRPC exporters already need it.
89+
http-retry = ["opentelemetry_sdk/experimental_async_runtime", "opentelemetry_sdk/rt-tokio", "tokio"]
90+
8691
http-json = ["serde_json", "prost", "opentelemetry-http", "opentelemetry-proto/gen-tonic-messages", "opentelemetry-proto/with-serde", "http", "trace", "metrics"]
8792
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest-blocking"]
8893
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,138 @@ use opentelemetry_sdk::{
88
trace::{SpanData, SpanExporter},
99
};
1010

11+
#[cfg(feature = "http-retry")]
12+
use crate::retry_classification::http::classify_http_error;
13+
#[cfg(feature = "http-retry")]
14+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryErrorType, RetryPolicy};
15+
#[cfg(feature = "http-retry")]
16+
use opentelemetry_sdk::runtime::Tokio;
17+
18+
#[cfg(feature = "http-retry")]
19+
/// HTTP-specific error wrapper for retry classification
20+
#[derive(Debug)]
21+
struct HttpExportError {
22+
status_code: u16,
23+
retry_after: Option<String>,
24+
message: String,
25+
}
26+
27+
#[cfg(feature = "http-retry")]
28+
/// Classify HTTP export errors for retry decisions
29+
fn classify_http_export_error(error: &HttpExportError) -> RetryErrorType {
30+
classify_http_error(error.status_code, error.retry_after.as_deref())
31+
}
32+
1133
impl SpanExporter for OtlpHttpClient {
34+
#[cfg(feature = "http-retry")]
35+
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
36+
let policy = RetryPolicy {
37+
max_retries: 3,
38+
initial_delay_ms: 100,
39+
max_delay_ms: 1600,
40+
jitter_ms: 100,
41+
};
42+
43+
let batch = Arc::new(batch);
44+
45+
retry_with_backoff(
46+
Tokio,
47+
policy,
48+
classify_http_export_error,
49+
"HttpTracesClient.Export",
50+
|| async {
51+
let batch_clone = Arc::clone(&batch);
52+
53+
// Get client
54+
let client = self
55+
.client
56+
.lock()
57+
.map_err(|e| HttpExportError {
58+
status_code: 500,
59+
retry_after: None,
60+
message: format!("Mutex lock failed: {e}"),
61+
})?
62+
.as_ref()
63+
.ok_or_else(|| HttpExportError {
64+
status_code: 500,
65+
retry_after: None,
66+
message: "Exporter already shutdown".to_string(),
67+
})?
68+
.clone();
69+
70+
// Build request body
71+
let (body, content_type, content_encoding) = self
72+
.build_trace_export_body((*batch_clone).clone())
73+
.map_err(|e| HttpExportError {
74+
status_code: 400,
75+
retry_after: None,
76+
message: format!("Failed to build request body: {e}"),
77+
})?;
78+
79+
// Build HTTP request
80+
let mut request_builder = http::Request::builder()
81+
.method(Method::POST)
82+
.uri(&self.collector_endpoint)
83+
.header(CONTENT_TYPE, content_type);
84+
85+
if let Some(encoding) = content_encoding {
86+
request_builder = request_builder.header("Content-Encoding", encoding);
87+
}
88+
89+
let mut request =
90+
request_builder
91+
.body(body.into())
92+
.map_err(|e| HttpExportError {
93+
status_code: 400,
94+
retry_after: None,
95+
message: format!("Failed to build HTTP request: {e}"),
96+
})?;
97+
98+
for (k, v) in &self.headers {
99+
request.headers_mut().insert(k.clone(), v.clone());
100+
}
101+
102+
let request_uri = request.uri().to_string();
103+
otel_debug!(name: "HttpTracesClient.ExportStarted");
104+
105+
// Send request
106+
let response = client.send_bytes(request).await.map_err(|e| {
107+
HttpExportError {
108+
status_code: 0, // Network error
109+
retry_after: None,
110+
message: format!("Network error: {e:?}"),
111+
}
112+
})?;
113+
114+
let status_code = response.status().as_u16();
115+
let retry_after = response
116+
.headers()
117+
.get("retry-after")
118+
.and_then(|v| v.to_str().ok())
119+
.map(|s| s.to_string());
120+
121+
if !response.status().is_success() {
122+
return Err(HttpExportError {
123+
status_code,
124+
retry_after,
125+
message: format!(
126+
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
127+
request_uri,
128+
status_code,
129+
response.body()
130+
),
131+
});
132+
}
133+
134+
otel_debug!(name: "HttpTracesClient.ExportSucceeded");
135+
Ok(())
136+
},
137+
)
138+
.await
139+
.map_err(|e| OTelSdkError::InternalFailure(e.message))
140+
}
141+
142+
#[cfg(not(feature = "http-retry"))]
12143
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
13144
let client = match self
14145
.client

opentelemetry-otlp/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ mod metric;
366366
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
367367
mod span;
368368

369-
#[cfg(feature = "grpc-tonic")]
369+
#[cfg(any(feature = "grpc-tonic", feature = "http-retry"))]
370370
pub mod retry_classification;
371371

372372
pub use crate::exporter::Compression;

opentelemetry-otlp/src/retry_classification.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ pub mod http {
6060
fn parse_retry_after(retry_after: &str) -> Option<Duration> {
6161
// Try parsing as seconds first
6262
if let Ok(seconds) = retry_after.trim().parse::<u64>() {
63-
// Cap at 10 minutes for safety
63+
// Cap at 10 minutes. TODO - what's sensible here?
6464
let capped_seconds = seconds.min(600);
6565
return Some(Duration::from_secs(capped_seconds));
6666
}
6767

6868
// Try parsing as HTTP date
6969
if let Ok(delay_seconds) = parse_http_date_to_delay(retry_after) {
70-
// Cap at 10 minutes for safety
70+
// Cap at 10 minutes. TODO - what's sensible here?
7171
let capped_seconds = delay_seconds.min(600);
7272
return Some(Duration::from_secs(capped_seconds));
7373
}
@@ -78,7 +78,7 @@ pub mod http {
7878
/// Parses HTTP date format and returns delay in seconds from now.
7979
///
8080
/// This is a simplified parser for the most common HTTP date format.
81-
/// In production, you might want to use a proper HTTP date parsing library.
81+
/// TODO - should we use a library here?
8282
fn parse_http_date_to_delay(date_str: &str) -> Result<u64, ()> {
8383
// For now, return error - would need proper HTTP date parsing
8484
// This could be implemented with chrono or similar
@@ -88,6 +88,7 @@ pub mod http {
8888
}
8989

9090
/// gRPC-specific error classification with RetryInfo support.
91+
#[cfg(feature = "grpc-tonic")]
9192
pub mod grpc {
9293
use super::*;
9394

@@ -122,7 +123,7 @@ pub mod grpc {
122123
tonic::Code::ResourceExhausted => {
123124
if let Some(seconds) = retry_info_seconds {
124125
// Server signals recovery is possible - use throttled retry
125-
let capped_seconds = seconds.min(600); // Cap at 10 minutes for safety
126+
let capped_seconds = seconds.min(600); // Cap at 10 minutes. TODO - what's sensible here?
126127
return RetryErrorType::Throttled(std::time::Duration::from_secs(
127128
capped_seconds,
128129
));

0 commit comments

Comments
 (0)