Skip to content

Commit 12f6f6d

Browse files
committed
Add retry_with_exponential_backoff method; Use in tonic logs client
1 parent e1c406a commit 12f6f6d

File tree

5 files changed

+96
-28
lines changed

5 files changed

+96
-28
lines changed

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ impl OtlpHttpClient {
322322
logs: LogBatch<'_>,
323323
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
324324
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
325-
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
325+
let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource);
326326
let req = ExportLogsServiceRequest { resource_logs };
327327

328328
match self.protocol {

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12
use core::fmt;
23
use opentelemetry::otel_debug;
34
use opentelemetry_proto::tonic::collector::logs::v1::{
@@ -12,6 +13,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1213
use super::BoxInterceptor;
1314
use tokio::sync::Mutex;
1415

16+
use crate::retry::{retry_with_exponential_backoff, RetryPolicy};
17+
1518
pub(crate) struct TonicLogsClient {
1619
inner: Option<ClientInner>,
1720
#[allow(dead_code)]
@@ -57,33 +60,50 @@ impl TonicLogsClient {
5760

5861
impl LogExporter for TonicLogsClient {
5962
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
60-
let (mut client, metadata, extensions) = match &self.inner {
61-
Some(inner) => {
62-
let (m, e, _) = inner
63-
.interceptor
64-
.lock()
65-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
66-
.call(Request::new(()))
67-
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
68-
.into_parts();
69-
(inner.client.clone(), m, e)
70-
}
71-
None => return Err(OTelSdkError::AlreadyShutdown),
63+
let policy = RetryPolicy {
64+
max_retries: 3,
65+
initial_delay_ms: 100,
66+
max_delay_ms: 1600,
67+
jitter_ms: 100,
7268
};
7369

74-
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
70+
let batch = Arc::new(batch); // Wrap batch in Arc<Mutex<LogBatch>>
71+
72+
retry_with_exponential_backoff(policy, "TonicLogsClient.Export", {
73+
let batch = Arc::clone(&batch);
74+
move || {
75+
let batch = Arc::clone(&batch); // Clone the Arc inside the closure
76+
Box::pin(async move {
77+
let (mut client, metadata, extensions) = match &self.inner {
78+
Some(inner) => {
79+
let (m, e, _) = inner
80+
.interceptor
81+
.lock()
82+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
83+
.call(Request::new(()))
84+
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
85+
.into_parts();
86+
(inner.client.clone(), m, e)
87+
}
88+
None => return Err(OTelSdkError::AlreadyShutdown),
89+
};
7590

76-
otel_debug!(name: "TonicsLogsClient.CallingExport");
91+
let resource_logs = group_logs_by_resource_and_scope(&*batch, &self.resource);
7792

78-
client
79-
.export(Request::from_parts(
80-
metadata,
81-
extensions,
82-
ExportLogsServiceRequest { resource_logs },
83-
))
84-
.await
85-
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
86-
Ok(())
93+
otel_debug!(name: "TonicsLogsClient.CallingExport");
94+
95+
client
96+
.export(Request::from_parts(
97+
metadata,
98+
extensions,
99+
ExportLogsServiceRequest { resource_logs },
100+
))
101+
.await
102+
.map(|_| ()) // Map the successful result to Ok(())
103+
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))
104+
})
105+
}
106+
}).await
87107
}
88108

89109
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ mod metric;
220220
#[cfg(feature = "trace")]
221221
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
222222
mod span;
223+
mod retry;
223224

224225
pub use crate::exporter::Compression;
225226
pub use crate::exporter::ExportConfig;

opentelemetry-otlp/src/retry.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::future::Future;
2+
use std::time::{Duration, SystemTime};
3+
use opentelemetry::otel_warn;
4+
use tokio::time::sleep;
5+
6+
pub(crate) struct RetryPolicy {
7+
pub max_retries: usize,
8+
pub initial_delay_ms: u64,
9+
pub max_delay_ms: u64,
10+
pub jitter_ms: u64,
11+
}
12+
13+
fn generate_jitter(max_jitter: u64) -> u64 {
14+
let now = SystemTime::now();
15+
let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos();
16+
nanos as u64 % (max_jitter + 1)
17+
}
18+
19+
pub(crate) async fn retry_with_exponential_backoff<F, Fut, T, E>(
20+
policy: RetryPolicy,
21+
operation_name: &str,
22+
mut operation: F,
23+
) -> Result<T, E>
24+
where
25+
F: FnMut() -> Fut,
26+
E: std::fmt::Debug,
27+
Fut: Future<Output = Result<T, E>>,
28+
{
29+
let mut attempt = 0;
30+
let mut delay = policy.initial_delay_ms;
31+
32+
loop {
33+
match operation().await {
34+
Ok(result) => return Ok(result),
35+
Err(err) if attempt < policy.max_retries => {
36+
attempt += 1;
37+
// Log the error and retry after a delay with jitter
38+
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
39+
let jitter = generate_jitter(policy.jitter_ms);
40+
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
41+
sleep(Duration::from_millis(delay_with_jitter)).await;
42+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
43+
}
44+
Err(err) => return Err(err),
45+
}
46+
}
47+
}

opentelemetry-proto/src/transform/logs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ pub mod tonic {
164164
}
165165
}
166166

167-
pub fn group_logs_by_resource_and_scope(
168-
logs: LogBatch<'_>,
167+
pub fn group_logs_by_resource_and_scope<'a>(
168+
logs: &'a LogBatch<'a>,
169169
resource: &ResourceAttributesWithSchema,
170170
) -> Vec<ResourceLogs> {
171171
// Group logs by target or instrumentation name
@@ -273,7 +273,7 @@ mod tests {
273273
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
274274

275275
let grouped_logs =
276-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
276+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
277277

278278
assert_eq!(grouped_logs.len(), 1);
279279
let resource_logs = &grouped_logs[0];
@@ -293,7 +293,7 @@ mod tests {
293293
let log_batch = LogBatch::new(&logs);
294294
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
295295
let grouped_logs =
296-
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
296+
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);
297297

298298
assert_eq!(grouped_logs.len(), 1);
299299
let resource_logs = &grouped_logs[0];

0 commit comments

Comments
 (0)