Skip to content

Commit 53fd83b

Browse files
AaronRMscottgerring
authored andcommitted
Add retry_with_exponential_backoff method; Use in tonic logs client
1 parent 919d4a3 commit 53fd83b

File tree

5 files changed

+105
-34
lines changed

5 files changed

+105
-34
lines changed

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ impl OtlpHttpClient {
388388
logs: LogBatch<'_>,
389389
) -> Result<(Vec<u8>, &'static str, Option<&'static str>), String> {
390390
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
391-
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
391+
let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource);
392392
let req = ExportLogsServiceRequest { resource_logs };
393393

394394
let (body, content_type) = match self.protocol {

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12
use core::fmt;
23
use opentelemetry::otel_debug;
34
use opentelemetry_proto::tonic::collector::logs::v1::{
@@ -13,6 +14,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1314

1415
use super::BoxInterceptor;
1516

17+
use crate::retry::{retry_with_exponential_backoff, RetryPolicy};
18+
1619
pub(crate) struct TonicLogsClient {
1720
inner: Mutex<Option<ClientInner>>,
1821
#[allow(dead_code)]
@@ -58,41 +61,61 @@ impl TonicLogsClient {
5861

5962
impl LogExporter for TonicLogsClient {
6063
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
61-
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
62-
Some(inner) => {
63-
let (m, e, _) = inner
64-
.interceptor
65-
.call(Request::new(()))
66-
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
67-
.into_parts();
68-
(inner.client.clone(), m, e)
69-
}
70-
None => return Err(OTelSdkError::AlreadyShutdown),
64+
let policy = RetryPolicy {
65+
max_retries: 3,
66+
initial_delay_ms: 100,
67+
max_delay_ms: 1600,
68+
jitter_ms: 100,
7169
};
7270

73-
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
71+
let batch = Arc::new(batch);
7472

75-
otel_debug!(name: "TonicLogsClient.ExportStarted");
73+
retry_with_exponential_backoff(policy, "TonicLogsClient.Export", {
74+
let batch = Arc::clone(&batch);
75+
let inner = &self.inner;
76+
let resource = &self.resource;
77+
move || {
78+
let batch = Arc::clone(&batch);
79+
Box::pin(async move {
80+
let (mut client, metadata, extensions) = match inner.lock().await.as_mut() {
81+
Some(inner) => {
82+
let (m, e, _) = inner
83+
.interceptor
84+
.call(Request::new(()))
85+
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
86+
.into_parts();
87+
(inner.client.clone(), m, e)
88+
}
89+
None => return Err(OTelSdkError::AlreadyShutdown),
90+
};
7691

77-
let result = client
78-
.export(Request::from_parts(
79-
metadata,
80-
extensions,
81-
ExportLogsServiceRequest { resource_logs },
82-
))
83-
.await;
92+
let resource_logs = group_logs_by_resource_and_scope(&*batch, resource);
8493

85-
match result {
86-
Ok(_) => {
87-
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
88-
Ok(())
89-
}
90-
Err(e) => {
91-
let error = format!("export error: {e:?}");
92-
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
93-
Err(OTelSdkError::InternalFailure(error))
94+
otel_debug!(name: "TonicsLogsClient.ExportStarted");
95+
96+
let result = client
97+
.export(Request::from_parts(
98+
metadata,
99+
extensions,
100+
ExportLogsServiceRequest { resource_logs },
101+
))
102+
.await;
103+
104+
match result {
105+
Ok(_) => {
106+
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
107+
Ok(())
108+
}
109+
Err(e) => {
110+
let error = format!("export error: {e:?}");
111+
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
112+
Err(OTelSdkError::InternalFailure(error))
113+
}
114+
}
115+
})
94116
}
95-
}
117+
})
118+
.await
96119
}
97120

98121
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ mod metric;
365365
#[cfg(feature = "trace")]
366366
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
367367
mod span;
368+
mod retry;
368369

369370
pub use crate::exporter::Compression;
370371
pub use crate::exporter::ExportConfig;

opentelemetry-otlp/src/retry.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::future::Future;
2+
use std::time::{Duration, SystemTime};
3+
use opentelemetry::otel_warn;
4+
use tokio::time::sleep;
5+
6+
pub(crate) struct RetryPolicy {
7+
pub max_retries: usize,
8+
pub initial_delay_ms: u64,
9+
pub max_delay_ms: u64,
10+
pub jitter_ms: u64,
11+
}
12+
13+
fn generate_jitter(max_jitter: u64) -> u64 {
14+
let now = SystemTime::now();
15+
let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos();
16+
nanos as u64 % (max_jitter + 1)
17+
}
18+
19+
pub(crate) async fn retry_with_exponential_backoff<F, Fut, T, E>(
20+
policy: RetryPolicy,
21+
operation_name: &str,
22+
mut operation: F,
23+
) -> Result<T, E>
24+
where
25+
F: FnMut() -> Fut,
26+
E: std::fmt::Debug,
27+
Fut: Future<Output = Result<T, E>>,
28+
{
29+
let mut attempt = 0;
30+
let mut delay = policy.initial_delay_ms;
31+
32+
loop {
33+
match operation().await {
34+
Ok(result) => return Ok(result),
35+
Err(err) if attempt < policy.max_retries => {
36+
attempt += 1;
37+
// Log the error and retry after a delay with jitter
38+
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
39+
let jitter = generate_jitter(policy.jitter_ms);
40+
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
41+
sleep(Duration::from_millis(delay_with_jitter)).await;
42+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
43+
}
44+
Err(err) => return Err(err),
45+
}
46+
}
47+
}

opentelemetry-proto/src/transform/logs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ pub mod tonic {
165165
}
166166
}
167167

168-
pub fn group_logs_by_resource_and_scope(
169-
logs: LogBatch<'_>,
168+
pub fn group_logs_by_resource_and_scope<'a>(
169+
logs: &'a LogBatch<'a>,
170170
resource: &ResourceAttributesWithSchema,
171171
) -> Vec<ResourceLogs> {
172172
// Group logs by target or instrumentation name
@@ -271,7 +271,7 @@ mod tests {
271271
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
272272

273273
let grouped_logs =
274-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
274+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
275275

276276
assert_eq!(grouped_logs.len(), 1);
277277
let resource_logs = &grouped_logs[0];
@@ -291,7 +291,7 @@ mod tests {
291291
let log_batch = LogBatch::new(&logs);
292292
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
293293
let grouped_logs =
294-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
294+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
295295

296296
assert_eq!(grouped_logs.len(), 1);
297297
let resource_logs = &grouped_logs[0];

0 commit comments

Comments
 (0)