Skip to content

Commit ff33723

Browse files
committed
chore: factor retry logic out
1 parent a2f13d5 commit ff33723

File tree

4 files changed

+164
-482
lines changed

4 files changed

+164
-482
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 3 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -1,166 +1,16 @@
11
use super::OtlpHttpClient;
2-
use http::header::CONTENT_ENCODING;
3-
use http::{header::CONTENT_TYPE, Method};
4-
use opentelemetry::otel_debug;
52
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
63
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
7-
#[cfg(feature = "http-retry")]
8-
use std::sync::Arc;
94
use std::time;
105

11-
#[cfg(feature = "http-retry")]
12-
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
13-
#[cfg(feature = "http-retry")]
14-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
15-
#[cfg(feature = "http-retry")]
16-
use opentelemetry_sdk::runtime::Tokio;
17-
186
impl LogExporter for OtlpHttpClient {
19-
#[cfg(feature = "http-retry")]
207
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
21-
let policy = RetryPolicy {
22-
max_retries: 3,
23-
initial_delay_ms: 100,
24-
max_delay_ms: 1600,
25-
jitter_ms: 100,
26-
};
27-
28-
// Build request body once before retry loop since LogBatch contains borrowed data
29-
let (body, content_type, content_encoding) = self
30-
.build_logs_export_body(batch)
31-
.map_err(OTelSdkError::InternalFailure)?;
32-
33-
let retry_data = Arc::new(HttpRetryData {
34-
body,
35-
headers: self.headers.clone(),
36-
endpoint: self.collector_endpoint.to_string(),
37-
});
38-
39-
retry_with_backoff(
40-
Tokio,
41-
policy,
42-
classify_http_export_error,
8+
self.export_http_with_retry(
9+
batch,
10+
OtlpHttpClient::build_logs_export_body,
4311
"HttpLogsClient.Export",
44-
|| async {
45-
// Get client
46-
let client = self
47-
.client
48-
.lock()
49-
.map_err(|e| HttpExportError::new(500, format!("Mutex lock failed: {e}")))?
50-
.as_ref()
51-
.ok_or_else(|| {
52-
HttpExportError::new(500, "Exporter already shutdown".to_string())
53-
})?
54-
.clone();
55-
56-
// Build HTTP request
57-
let mut request_builder = http::Request::builder()
58-
.method(Method::POST)
59-
.uri(&retry_data.endpoint)
60-
.header(CONTENT_TYPE, content_type);
61-
62-
if let Some(encoding) = content_encoding {
63-
request_builder = request_builder.header(CONTENT_ENCODING, encoding);
64-
}
65-
66-
let mut request = request_builder
67-
.body(retry_data.body.clone().into())
68-
.map_err(|e| {
69-
HttpExportError::new(400, format!("Failed to build HTTP request: {e}"))
70-
})?;
71-
72-
for (k, v) in retry_data.headers.iter() {
73-
request.headers_mut().insert(k.clone(), v.clone());
74-
}
75-
76-
let request_uri = request.uri().to_string();
77-
otel_debug!(name: "HttpLogsClient.ExportStarted");
78-
79-
// Send request
80-
let response = client.send_bytes(request).await.map_err(|e| {
81-
HttpExportError::new(0, format!("Network error: {e:?}")) // Network error
82-
})?;
83-
84-
let status_code = response.status().as_u16();
85-
let retry_after = response
86-
.headers()
87-
.get("retry-after")
88-
.and_then(|v| v.to_str().ok())
89-
.map(|s| s.to_string());
90-
91-
if !response.status().is_success() {
92-
let message = format!(
93-
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
94-
request_uri,
95-
status_code,
96-
response.body()
97-
);
98-
return Err(match retry_after {
99-
Some(retry_after) => {
100-
HttpExportError::with_retry_after(status_code, retry_after, message)
101-
}
102-
None => HttpExportError::new(status_code, message),
103-
});
104-
}
105-
106-
otel_debug!(name: "HttpLogsClient.ExportSucceeded");
107-
Ok(())
108-
},
10912
)
11013
.await
111-
.map_err(|e| OTelSdkError::InternalFailure(e.message))
112-
}
113-
114-
#[cfg(not(feature = "http-retry"))]
115-
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
116-
let client = self
117-
.client
118-
.lock()
119-
.map_err(|e| OTelSdkError::InternalFailure(format!("Mutex lock failed: {e}")))?
120-
.clone()
121-
.ok_or(OTelSdkError::AlreadyShutdown)?;
122-
123-
let (body, content_type, content_encoding) = self
124-
.build_logs_export_body(batch)
125-
.map_err(OTelSdkError::InternalFailure)?;
126-
127-
let mut request_builder = http::Request::builder()
128-
.method(Method::POST)
129-
.uri(&self.collector_endpoint)
130-
.header(CONTENT_TYPE, content_type);
131-
132-
if let Some(encoding) = content_encoding {
133-
request_builder = request_builder.header(CONTENT_ENCODING, encoding);
134-
}
135-
136-
let mut request = request_builder
137-
.body(body.into())
138-
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
139-
140-
for (k, v) in self.headers.iter() {
141-
request.headers_mut().insert(k.clone(), v.clone());
142-
}
143-
144-
let request_uri = request.uri().to_string();
145-
otel_debug!(name: "HttpLogsClient.ExportStarted");
146-
let response = client
147-
.send_bytes(request)
148-
.await
149-
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
150-
151-
if !response.status().is_success() {
152-
let error = format!(
153-
"OpenTelemetry logs export failed. Url: {}, Status Code: {}, Response: {:?}",
154-
request_uri,
155-
response.status().as_u16(),
156-
response.body()
157-
);
158-
otel_debug!(name: "HttpLogsClient.ExportFailed", error = &error);
159-
return Err(OTelSdkError::InternalFailure(error));
160-
}
161-
162-
otel_debug!(name: "HttpLogsClient.ExportSucceeded");
163-
Ok(())
16414
}
16515

16616
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 6 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -1,174 +1,19 @@
1-
use std::sync::Arc;
2-
31
use crate::metric::MetricsClient;
4-
use http::{header::CONTENT_TYPE, Method};
5-
use opentelemetry::otel_debug;
62
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
73
use opentelemetry_sdk::metrics::data::ResourceMetrics;
84

95
use super::OtlpHttpClient;
106

11-
#[cfg(feature = "http-retry")]
12-
use super::{classify_http_export_error, HttpExportError, HttpRetryData};
13-
#[cfg(feature = "http-retry")]
14-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
15-
#[cfg(feature = "http-retry")]
16-
use opentelemetry_sdk::runtime::Tokio;
17-
187
impl MetricsClient for OtlpHttpClient {
19-
#[cfg(feature = "http-retry")]
208
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
21-
let policy = RetryPolicy {
22-
max_retries: 3,
23-
initial_delay_ms: 100,
24-
max_delay_ms: 1600,
25-
jitter_ms: 100,
9+
let build_body_wrapper = |client: &OtlpHttpClient, metrics: &ResourceMetrics| {
10+
client
11+
.build_metrics_export_body(metrics)
12+
.ok_or_else(|| "Failed to serialize metrics".to_string())
2613
};
2714

28-
// Build request body once before retry loop
29-
let (body, content_type, content_encoding) =
30-
self.build_metrics_export_body(metrics).ok_or_else(|| {
31-
OTelSdkError::InternalFailure("Failed to serialize metrics".to_string())
32-
})?;
33-
34-
let retry_data = Arc::new(HttpRetryData {
35-
body,
36-
headers: self.headers.clone(),
37-
endpoint: self.collector_endpoint.to_string(),
38-
});
39-
40-
retry_with_backoff(
41-
Tokio,
42-
policy,
43-
classify_http_export_error,
44-
"HttpMetricsClient.Export",
45-
|| async {
46-
// Get client
47-
let client = self
48-
.client
49-
.lock()
50-
.map_err(|e| HttpExportError::new(500, format!("Mutex lock failed: {e}")))?
51-
.as_ref()
52-
.ok_or_else(|| {
53-
HttpExportError::new(500, "Exporter already shutdown".to_string())
54-
})?
55-
.clone();
56-
57-
// Build HTTP request
58-
let mut request_builder = http::Request::builder()
59-
.method(Method::POST)
60-
.uri(&retry_data.endpoint)
61-
.header(CONTENT_TYPE, content_type);
62-
63-
if let Some(encoding) = content_encoding {
64-
request_builder = request_builder.header("Content-Encoding", encoding);
65-
}
66-
67-
let mut request = request_builder
68-
.body(retry_data.body.clone().into())
69-
.map_err(|e| {
70-
HttpExportError::new(400, format!("Failed to build HTTP request: {e}"))
71-
})?;
72-
73-
for (k, v) in retry_data.headers.iter() {
74-
request.headers_mut().insert(k.clone(), v.clone());
75-
}
76-
77-
let request_uri = request.uri().to_string();
78-
otel_debug!(name: "HttpMetricsClient.ExportStarted");
79-
80-
// Send request
81-
let response = client.send_bytes(request).await.map_err(|e| {
82-
HttpExportError::new(0, format!("Network error: {e:?}")) // Network error
83-
})?;
84-
85-
let status_code = response.status().as_u16();
86-
let retry_after = response
87-
.headers()
88-
.get("retry-after")
89-
.and_then(|v| v.to_str().ok())
90-
.map(|s| s.to_string());
91-
92-
if !response.status().is_success() {
93-
let message = format!(
94-
"HTTP export failed. Url: {}, Status: {}, Response: {:?}",
95-
request_uri,
96-
status_code,
97-
response.body()
98-
);
99-
return Err(match retry_after {
100-
Some(retry_after) => {
101-
HttpExportError::with_retry_after(status_code, retry_after, message)
102-
}
103-
None => HttpExportError::new(status_code, message),
104-
});
105-
}
106-
107-
otel_debug!(name: "HttpMetricsClient.ExportSucceeded");
108-
Ok(())
109-
},
110-
)
111-
.await
112-
.map_err(|e| OTelSdkError::InternalFailure(e.message))
113-
}
114-
115-
#[cfg(not(feature = "http-retry"))]
116-
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
117-
let client = self
118-
.client
119-
.lock()
120-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
121-
.and_then(|g| match &*g {
122-
Some(client) => Ok(Arc::clone(client)),
123-
_ => Err(OTelSdkError::AlreadyShutdown),
124-
})?;
125-
126-
let (body, content_type, content_encoding) =
127-
self.build_metrics_export_body(metrics).ok_or_else(|| {
128-
OTelSdkError::InternalFailure("Failed to serialize metrics".to_string())
129-
})?;
130-
131-
let mut request_builder = http::Request::builder()
132-
.method(Method::POST)
133-
.uri(&self.collector_endpoint)
134-
.header(CONTENT_TYPE, content_type);
135-
136-
if let Some(encoding) = content_encoding {
137-
request_builder = request_builder.header("Content-Encoding", encoding);
138-
}
139-
140-
let mut request = request_builder
141-
.body(body.into())
142-
.map_err(|e| OTelSdkError::InternalFailure(format!("{e:?}")))?;
143-
144-
for (k, v) in self.headers.iter() {
145-
request.headers_mut().insert(k.clone(), v.clone());
146-
}
147-
148-
otel_debug!(name: "HttpMetricsClient.ExportStarted");
149-
let result = client.send_bytes(request).await;
150-
151-
match result {
152-
Ok(response) => {
153-
if response.status().is_success() {
154-
otel_debug!(name: "HttpMetricsClient.ExportSucceeded");
155-
Ok(())
156-
} else {
157-
let error = format!(
158-
"OpenTelemetry metrics export failed. Status Code: {}, Response: {:?}",
159-
response.status().as_u16(),
160-
response.body()
161-
);
162-
otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error);
163-
Err(OTelSdkError::InternalFailure(error))
164-
}
165-
}
166-
Err(e) => {
167-
let error = format!("{e:?}");
168-
otel_debug!(name: "HttpMetricsClient.ExportFailed", error = &error);
169-
Err(OTelSdkError::InternalFailure(error))
170-
}
171-
}
15+
self.export_http_with_retry(metrics, build_body_wrapper, "HttpMetricsClient.Export")
16+
.await
17217
}
17318

17419
fn shutdown(&self) -> OTelSdkResult {

0 commit comments

Comments
 (0)