-
Notifications
You must be signed in to change notification settings - Fork 557
feat: support backoff/retry in OTLP #3126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
03b4d35
919d4a3
53fd83b
bb07fe2
d9b985b
26f9a4a
91c1b9d
3b29f8e
6fe4546
9714d15
c376e07
18676e9
fb141db
75f0d71
fa1e093
15e9148
1a1ac61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,124 @@ use http::{header::CONTENT_TYPE, Method}; | |
use opentelemetry::otel_debug; | ||
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; | ||
use opentelemetry_sdk::logs::{LogBatch, LogExporter}; | ||
#[cfg(feature = "http-retry")] | ||
use std::sync::Arc; | ||
use std::time; | ||
|
||
#[cfg(feature = "http-retry")] | ||
use super::{classify_http_export_error, HttpExportError, HttpRetryData}; | ||
#[cfg(feature = "http-retry")] | ||
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy}; | ||
#[cfg(feature = "http-retry")] | ||
use opentelemetry_sdk::runtime::Tokio; | ||
|
||
impl LogExporter for OtlpHttpClient { | ||
#[cfg(feature = "http-retry")] | ||
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { | ||
let policy = RetryPolicy { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we create a default retry policy and share it between all exporters? |
||
max_retries: 3, | ||
initial_delay_ms: 100, | ||
max_delay_ms: 1600, | ||
jitter_ms: 100, | ||
}; | ||
|
||
// Build request body once before retry loop since LogBatch contains borrowed data | ||
let (body, content_type, content_encoding) = self | ||
.build_logs_export_body(batch) | ||
.map_err(OTelSdkError::InternalFailure)?; | ||
|
||
let retry_data = Arc::new(HttpRetryData { | ||
body, | ||
headers: self.headers.clone(), | ||
endpoint: self.collector_endpoint.to_string(), | ||
}); | ||
|
||
retry_with_backoff( | ||
Tokio, | ||
policy, | ||
classify_http_export_error, | ||
"HttpLogsClient.Export", | ||
|| async { | ||
// Get client | ||
let client = self | ||
.client | ||
.lock() | ||
.map_err(|e| HttpExportError { | ||
status_code: 500, | ||
retry_after: None, | ||
message: format!("Mutex lock failed: {e}"), | ||
})? | ||
.as_ref() | ||
.ok_or_else(|| HttpExportError { | ||
status_code: 500, | ||
retry_after: None, | ||
message: "Exporter already shutdown".to_string(), | ||
})? | ||
.clone(); | ||
|
||
// Build HTTP request | ||
let mut request_builder = http::Request::builder() | ||
.method(Method::POST) | ||
.uri(&retry_data.endpoint) | ||
.header(CONTENT_TYPE, content_type); | ||
|
||
if let Some(encoding) = content_encoding { | ||
request_builder = request_builder.header(CONTENT_ENCODING, encoding); | ||
} | ||
|
||
let mut request = request_builder | ||
.body(retry_data.body.clone().into()) | ||
.map_err(|e| HttpExportError { | ||
status_code: 400, | ||
retry_after: None, | ||
message: format!("Failed to build HTTP request: {e}"), | ||
})?; | ||
|
||
for (k, v) in &retry_data.headers { | ||
request.headers_mut().insert(k.clone(), v.clone()); | ||
} | ||
|
||
let request_uri = request.uri().to_string(); | ||
otel_debug!(name: "HttpLogsClient.ExportStarted"); | ||
|
||
// Send request | ||
let response = client.send_bytes(request).await.map_err(|e| { | ||
HttpExportError { | ||
status_code: 0, // Network error | ||
retry_after: None, | ||
message: format!("Network error: {e:?}"), | ||
} | ||
})?; | ||
|
||
let status_code = response.status().as_u16(); | ||
let retry_after = response | ||
.headers() | ||
.get("retry-after") | ||
.and_then(|v| v.to_str().ok()) | ||
.map(|s| s.to_string()); | ||
|
||
if !response.status().is_success() { | ||
return Err(HttpExportError { | ||
status_code, | ||
retry_after, | ||
message: format!( | ||
"HTTP export failed. Url: {}, Status: {}, Response: {:?}", | ||
request_uri, | ||
status_code, | ||
response.body() | ||
), | ||
}); | ||
} | ||
|
||
otel_debug!(name: "HttpLogsClient.ExportSucceeded"); | ||
Ok(()) | ||
}, | ||
) | ||
.await | ||
.map_err(|e| OTelSdkError::InternalFailure(e.message)) | ||
} | ||
|
||
#[cfg(not(feature = "http-retry"))] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a massive duplication of code. If we decide that http export always has retry behaviour, which means we include the unstable runtime feature, then we can remove all of this. Alternatively we can provide There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment above applies to the other HTTP exporters also. |
||
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { | ||
let client = self | ||
.client | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for gRPC error type