Skip to content

Commit 15e9148

Browse files
committed
Implement retry for http metrics and logs
1 parent fa1e093 commit 15e9148

File tree

4 files changed

+282
-39
lines changed

4 files changed

+282
-39
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,124 @@ use http::{header::CONTENT_TYPE, Method};
44
use opentelemetry::otel_debug;
55
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
66
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
7+
#[cfg(feature = "http-retry")]
8+
use std::sync::Arc;
79
use std::time;
810

11+
#[cfg(feature = "http-retry")]
12+
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
13+
#[cfg(feature = "http-retry")]
14+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
15+
#[cfg(feature = "http-retry")]
16+
use opentelemetry_sdk::runtime::Tokio;
17+
918
impl LogExporter for OtlpHttpClient {
19+
#[cfg(feature = "http-retry")]
20+
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
21+
let policy = RetryPolicy {
22+
max_retries: 3,
23+
initial_delay_ms: 100,
24+
max_delay_ms: 1600,
25+
jitter_ms: 100,
26+
};
27+
28+
// Build request body once before retry loop since LogBatch contains borrowed data
29+
let (body, content_type, content_encoding) = self
30+
.build_logs_export_body(batch)
31+
.map_err(OTelSdkError::InternalFailure)?;
32+
33+
let retry_data = Arc::new(HttpRetryData {
34+
body,
35+
headers: self.headers.clone(),
36+
endpoint: self.collector_endpoint.to_string(),
37+
});
38+
39+
retry_with_backoff(
40+
Tokio,
41+
policy,
42+
classify_http_export_error,
43+
"HttpLogsClient.Export",
44+
|| async {
45+
// Get client
46+
let client = self
47+
.client
48+
.lock()
49+
.map_err(|e| HttpExportError {
50+
status_code: 500,
51+
retry_after: None,
52+
message: format!("Mutex lock failed: {e}"),
53+
})?
54+
.as_ref()
55+
.ok_or_else(|| HttpExportError {
56+
status_code: 500,
57+
retry_after: None,
58+
message: "Exporter already shutdown".to_string(),
59+
})?
60+
.clone();
61+
62+
// Build HTTP request
63+
let mut request_builder = http::Request::builder()
64+
.method(Method::POST)
65+
.uri(&retry_data.endpoint)
66+
.header(CONTENT_TYPE, content_type);
67+
68+
if let Some(encoding) = content_encoding {
69+
request_builder = request_builder.header(CONTENT_ENCODING, encoding);
70+
}
71+
72+
let mut request = request_builder
73+
.body(retry_data.body.clone().into())
74+
.map_err(|e| HttpExportError {
75+
status_code: 400,
76+
retry_after: None,
77+
message: format!("Failed to build HTTP request: {e}"),
78+
})?;
79+
80+
for (k, v) in &retry_data.headers {
81+
request.headers_mut().insert(k.clone(), v.clone());
82+
}
83+
84+
let request_uri = request.uri().to_string();
85+
otel_debug!(name: "HttpLogsClient.ExportStarted");
86+
87+
// Send request
88+
let response = client.send_bytes(request).await.map_err(|e| {
89+
HttpExportError {
90+
status_code: 0, // Network error
91+
retry_after: None,
92+
message: format!("Network error: {e:?}"),
93+
}
94+
})?;
95+
96+
let status_code = response.status().as_u16();
97+
let retry_after = response
98+
.headers()
99+
.get("retry-after")
100+
.and_then(|v| v.to_str().ok())
101+
.map(|s| s.to_string());
102+
103+
if !response.status().is_success() {
104+
return Err(HttpExportError {
105+
status_code,
106+
retry_after,
107+
message: format!(
108+
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
109+
request_uri,
110+
status_code,
111+
response.body()
112+
),
113+
});
114+
}
115+
116+
otel_debug!(name: "HttpLogsClient.ExportSucceeded");
117+
Ok(())
118+
},
119+
)
120+
.await
121+
.map_err(|e| OTelSdkError::InternalFailure(e.message))
122+
}
123+
124+
#[cfg(not(feature = "http-retry"))]
10125
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
11126
let client = self
12127
.client

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,121 @@ use opentelemetry_sdk::metrics::data::ResourceMetrics;
88

99
use super::OtlpHttpClient;
1010

11+
#[cfg(feature = "http-retry")]
12+
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
13+
#[cfg(feature = "http-retry")]
14+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
15+
#[cfg(feature = "http-retry")]
16+
use opentelemetry_sdk::runtime::Tokio;
17+
1118
impl MetricsClient for OtlpHttpClient {
19+
#[cfg(feature = "http-retry")]
20+
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
21+
let policy = RetryPolicy {
22+
max_retries: 3,
23+
initial_delay_ms: 100,
24+
max_delay_ms: 1600,
25+
jitter_ms: 100,
26+
};
27+
28+
// Build request body once before retry loop
29+
let (body, content_type, content_encoding) =
30+
self.build_metrics_export_body(metrics).ok_or_else(|| {
31+
OTelSdkError::InternalFailure("Failed to serialize metrics".to_string())
32+
})?;
33+
34+
let retry_data = Arc::new(HttpRetryData {
35+
body,
36+
headers: self.headers.clone(),
37+
endpoint: self.collector_endpoint.to_string(),
38+
});
39+
40+
retry_with_backoff(
41+
Tokio,
42+
policy,
43+
classify_http_export_error,
44+
"HttpMetricsClient.Export",
45+
|| async {
46+
// Get client
47+
let client = self
48+
.client
49+
.lock()
50+
.map_err(|e| HttpExportError {
51+
status_code: 500,
52+
retry_after: None,
53+
message: format!("Mutex lock failed: {e}"),
54+
})?
55+
.as_ref()
56+
.ok_or_else(|| HttpExportError {
57+
status_code: 500,
58+
retry_after: None,
59+
message: "Exporter already shutdown".to_string(),
60+
})?
61+
.clone();
62+
63+
// Build HTTP request
64+
let mut request_builder = http::Request::builder()
65+
.method(Method::POST)
66+
.uri(&retry_data.endpoint)
67+
.header(CONTENT_TYPE, content_type);
68+
69+
if let Some(encoding) = content_encoding {
70+
request_builder = request_builder.header("Content-Encoding", encoding);
71+
}
72+
73+
let mut request = request_builder
74+
.body(retry_data.body.clone().into())
75+
.map_err(|e| HttpExportError {
76+
status_code: 400,
77+
retry_after: None,
78+
message: format!("Failed to build HTTP request: {e}"),
79+
})?;
80+
81+
for (k, v) in &retry_data.headers {
82+
request.headers_mut().insert(k.clone(), v.clone());
83+
}
84+
85+
let request_uri = request.uri().to_string();
86+
otel_debug!(name: "HttpMetricsClient.ExportStarted");
87+
88+
// Send request
89+
let response = client.send_bytes(request).await.map_err(|e| {
90+
HttpExportError {
91+
status_code: 0, // Network error
92+
retry_after: None,
93+
message: format!("Network error: {e:?}"),
94+
}
95+
})?;
96+
97+
let status_code = response.status().as_u16();
98+
let retry_after = response
99+
.headers()
100+
.get("retry-after")
101+
.and_then(|v| v.to_str().ok())
102+
.map(|s| s.to_string());
103+
104+
if !response.status().is_success() {
105+
return Err(HttpExportError {
106+
status_code,
107+
retry_after,
108+
message: format!(
109+
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
110+
request_uri,
111+
status_code,
112+
response.body()
113+
),
114+
});
115+
}
116+
117+
otel_debug!(name: "HttpMetricsClient.ExportSucceeded");
118+
Ok(())
119+
},
120+
)
121+
.await
122+
.map_err(|e| OTelSdkError::InternalFailure(e.message))
123+
}
124+
125+
#[cfg(not(feature = "http-retry"))]
12126
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
13127
let client = self
14128
.client

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,37 @@ use std::str::FromStr;
2222
use std::sync::{Arc, Mutex};
2323
use std::time::Duration;
2424

25+
#[cfg(feature = "http-retry")]
26+
use crate::retry_classification::http::classify_http_error;
27+
#[cfg(feature = "http-retry")]
28+
use opentelemetry_sdk::retry::RetryErrorType;
29+
30+
// Shared HTTP retry functionality
31+
#[cfg(feature = "http-retry")]
32+
/// HTTP-specific error wrapper for retry classification
33+
#[derive(Debug)]
34+
pub(crate) struct HttpExportError {
35+
pub status_code: u16,
36+
pub retry_after: Option<String>,
37+
pub message: String,
38+
}
39+
40+
#[cfg(feature = "http-retry")]
41+
/// Classify HTTP export errors for retry decisions
42+
pub(crate) fn classify_http_export_error(error: &HttpExportError) -> RetryErrorType {
43+
classify_http_error(error.status_code, error.retry_after.as_deref())
44+
}
45+
46+
#[cfg(feature = "http-retry")]
47+
/// Shared HTTP request data for retry attempts - optimizes Arc usage by bundling all data
48+
/// we need to pass into the retry handler
49+
#[derive(Debug)]
50+
pub(crate) struct HttpRetryData {
51+
pub body: Vec<u8>,
52+
pub headers: HashMap<HeaderName, HeaderValue>,
53+
pub endpoint: String,
54+
}
55+
2556
#[cfg(feature = "metrics")]
2657
mod metrics;
2758

opentelemetry-otlp/src/exporter/http/trace.rs

Lines changed: 22 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,12 @@ use opentelemetry_sdk::{
99
};
1010

1111
#[cfg(feature = "http-retry")]
12-
use crate::retry_classification::http::classify_http_error;
12+
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
1313
#[cfg(feature = "http-retry")]
14-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryErrorType, RetryPolicy};
14+
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
1515
#[cfg(feature = "http-retry")]
1616
use opentelemetry_sdk::runtime::Tokio;
1717

18-
#[cfg(feature = "http-retry")]
19-
/// HTTP-specific error wrapper for retry classification
20-
#[derive(Debug)]
21-
struct HttpExportError {
22-
status_code: u16,
23-
retry_after: Option<String>,
24-
message: String,
25-
}
26-
27-
#[cfg(feature = "http-retry")]
28-
/// Classify HTTP export errors for retry decisions
29-
fn classify_http_export_error(error: &HttpExportError) -> RetryErrorType {
30-
classify_http_error(error.status_code, error.retry_after.as_deref())
31-
}
32-
3318
impl SpanExporter for OtlpHttpClient {
3419
#[cfg(feature = "http-retry")]
3520
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
@@ -40,16 +25,24 @@ impl SpanExporter for OtlpHttpClient {
4025
jitter_ms: 100,
4126
};
4227

43-
let batch = Arc::new(batch);
28+
// Build request body once before retry loop
29+
let (body, content_type, content_encoding) =
30+
self.build_trace_export_body(batch).map_err(|e| {
31+
OTelSdkError::InternalFailure(format!("Failed to build request body: {e}"))
32+
})?;
33+
34+
let retry_data = Arc::new(HttpRetryData {
35+
body,
36+
headers: self.headers.clone(),
37+
endpoint: self.collector_endpoint.to_string(),
38+
});
4439

4540
retry_with_backoff(
4641
Tokio,
4742
policy,
4843
classify_http_export_error,
4944
"HttpTracesClient.Export",
5045
|| async {
51-
let batch_clone = Arc::clone(&batch);
52-
5346
// Get client
5447
let client = self
5548
.client
@@ -67,35 +60,25 @@ impl SpanExporter for OtlpHttpClient {
6760
})?
6861
.clone();
6962

70-
// Build request body
71-
let (body, content_type, content_encoding) = self
72-
.build_trace_export_body((*batch_clone).clone())
73-
.map_err(|e| HttpExportError {
74-
status_code: 400,
75-
retry_after: None,
76-
message: format!("Failed to build request body: {e}"),
77-
})?;
78-
7963
// Build HTTP request
8064
let mut request_builder = http::Request::builder()
8165
.method(Method::POST)
82-
.uri(&self.collector_endpoint)
66+
.uri(&retry_data.endpoint)
8367
.header(CONTENT_TYPE, content_type);
8468

8569
if let Some(encoding) = content_encoding {
8670
request_builder = request_builder.header("Content-Encoding", encoding);
8771
}
8872

89-
let mut request =
90-
request_builder
91-
.body(body.into())
92-
.map_err(|e| HttpExportError {
93-
status_code: 400,
94-
retry_after: None,
95-
message: format!("Failed to build HTTP request: {e}"),
96-
})?;
73+
let mut request = request_builder
74+
.body(retry_data.body.clone().into())
75+
.map_err(|e| HttpExportError {
76+
status_code: 400,
77+
retry_after: None,
78+
message: format!("Failed to build HTTP request: {e}"),
79+
})?;
9780

98-
for (k, v) in &self.headers {
81+
for (k, v) in &retry_data.headers {
9982
request.headers_mut().insert(k.clone(), v.clone());
10083
}
10184

0 commit comments

Comments
 (0)