Skip to content

Commit 3b29f8e

Browse files
AaronRMscottgerring
authored andcommitted
Scope retry to just logs+tonic
1 parent 91c1b9d commit 3b29f8e

File tree

3 files changed

+242
-2
lines changed

3 files changed

+242
-2
lines changed

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl LogExporter for TonicLogsClient {
9090

9191
let resource_logs = group_logs_by_resource_and_scope(&*batch, resource);
9292

93-
otel_debug!(name: "TonicsLogsClient.ExportStarted");
93+
otel_debug!(name: "TonicLogsClient.ExportStarted");
9494

9595
let result = client
9696
.export(Request::from_parts(
@@ -118,7 +118,13 @@ impl LogExporter for TonicLogsClient {
118118
}
119119

120120
fn shutdown(&self) -> OTelSdkResult {
121-
// TODO: We broke this rebasing. fix it!
121+
// TODO: Implement actual shutdown
122+
// Due to the use of tokio::sync::Mutex to guard
123+
// the inner client, we need to await the call to lock the mutex
124+
// and that requires async runtime.
125+
// It is possible to fix this by using
126+
// a dedicated thread just to handle shutdown.
127+
// But for now, we just return Ok.
122128
Ok(())
123129
}
124130

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ pub(crate) mod metrics;
2525
#[cfg(feature = "trace")]
2626
pub(crate) mod trace;
2727

28+
// For now, we are not exposing the retry policy. Only work with grpc-tonic since retry takes a hard dependency on tokio
29+
// while we sort out an abstraction for the async runtime which can be used by all exporters.
30+
#[cfg(feature = "grpc-tonic")]
31+
mod retry;
32+
2833
/// Configuration for [tonic]
2934
///
3035
/// [tonic]: https://github.com/hyperium/tonic
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
//! This module provides functionality for retrying operations with exponential backoff and jitter.
2+
//!
3+
//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum
4+
//! number of retries, initial delay, maximum delay, and jitter.
5+
//!
6+
//! The `Sleep` trait abstracts the sleep functionality, allowing different implementations for
7+
//! various async runtimes such as Tokio and async-std, as well as a synchronous implementation.
8+
//!
9+
//! The `retry_with_exponential_backoff` function retries the given operation according to the
10+
//! specified retry policy, using exponential backoff and jitter to determine the delay between
11+
//! retries. The function logs errors and retries the operation until it succeeds or the maximum
12+
//! number of retries is reached.
13+
14+
use std::future::Future;
15+
use std::time::{Duration, SystemTime};
16+
use opentelemetry::otel_warn;
17+
18+
/// Configuration for retry policy.
19+
#[derive(Debug)]
20+
pub(super) struct RetryPolicy {
21+
/// Maximum number of retry attempts.
22+
pub max_retries: usize,
23+
/// Initial delay in milliseconds before the first retry.
24+
pub initial_delay_ms: u64,
25+
/// Maximum delay in milliseconds between retries.
26+
pub max_delay_ms: u64,
27+
/// Maximum jitter in milliseconds to add to the delay.
28+
pub jitter_ms: u64,
29+
}
30+
31+
// Generates a random jitter value up to max_jitter
32+
fn generate_jitter(max_jitter: u64) -> u64 {
33+
let now = SystemTime::now();
34+
let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos();
35+
nanos as u64 % (max_jitter + 1)
36+
}
37+
38+
// /// Trait to abstract the sleep functionality.
39+
// pub trait Sleep {
40+
// /// The future returned by the sleep function.
41+
// type SleepFuture: Future<Output = ()>;
42+
43+
// /// Sleeps for the specified duration.
44+
// fn sleep(duration: Duration) -> Self::SleepFuture;
45+
// }
46+
47+
// /// Implementation of the Sleep trait for tokio::time::Sleep
48+
// #[cfg(feature = "rt-tokio")]
49+
// impl Sleep for tokio::time::Sleep {
50+
// type SleepFuture = tokio::time::Sleep;
51+
52+
// fn sleep(duration: Duration) -> Self::SleepFuture {
53+
// }
54+
// }
55+
56+
// #[cfg(feature = "rt-async-std")]
57+
// /// There is no direct equivalent to `tokio::time::Sleep` in `async-std`.
58+
// /// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep`
59+
// /// trait for it, boxing the future returned by `async_std::task::sleep` to fit
60+
// /// the trait's associated type requirements.
61+
// #[derive(Debug)]
62+
// pub struct AsyncStdSleep;
63+
64+
// /// Implementation of the Sleep trait for async-std
65+
// #[cfg(feature = "rt-async-std")]
66+
// impl Sleep for AsyncStdSleep {
67+
// type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
68+
69+
// fn sleep(duration: Duration) -> Self::SleepFuture {
70+
// Box::pin(async_std::task::sleep(duration))
71+
// }
72+
// }
73+
74+
// /// Implement the Sleep trait for synchronous sleep
75+
// #[derive(Debug)]
76+
// pub struct StdSleep;
77+
78+
// impl Sleep for StdSleep {
79+
// type SleepFuture = std::future::Ready<()>;
80+
81+
// fn sleep(duration: Duration) -> Self::SleepFuture {
82+
// std::thread::sleep(duration);
83+
// std::future::ready(())
84+
// }
85+
// }
86+
87+
/// Retries the given operation with exponential backoff and jitter.
88+
///
89+
/// # Arguments
90+
///
91+
/// * `policy` - The retry policy configuration.
92+
/// * `operation_name` - The name of the operation being retried.
93+
/// * `operation` - The operation to be retried.
94+
///
95+
/// # Returns
96+
///
97+
/// A `Result` containing the operation's result or an error if the maximum retries are reached.
98+
pub(super) async fn retry_with_exponential_backoff<F, Fut, T, E>(
99+
policy: RetryPolicy,
100+
operation_name: &str,
101+
mut operation: F,
102+
) -> Result<T, E>
103+
where
104+
F: FnMut() -> Fut,
105+
E: std::fmt::Debug,
106+
Fut: Future<Output = Result<T, E>>,
107+
{
108+
let mut attempt = 0;
109+
let mut delay = policy.initial_delay_ms;
110+
111+
loop {
112+
match operation().await {
113+
Ok(result) => return Ok(result), // Return the result if the operation succeeds
114+
Err(err) if attempt < policy.max_retries => {
115+
attempt += 1;
116+
// Log the error and retry after a delay with jitter
117+
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
118+
let jitter = generate_jitter(policy.jitter_ms);
119+
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
120+
121+
// Retry currently only supports tokio::time::sleep (for use with gRPC/tonic). This
122+
// should be replaced with a more generic sleep function that works with async-std
123+
// and a synchronous runtime in the future.
124+
tokio::time::sleep(Duration::from_millis(delay_with_jitter)).await;
125+
126+
delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff
127+
}
128+
Err(err) => return Err(err), // Return the error if max retries are reached
129+
}
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
use super::*;
136+
use tokio::time::timeout;
137+
use std::sync::atomic::{AtomicUsize, Ordering};
138+
use std::time::Duration;
139+
140+
// Test to ensure generate_jitter returns a value within the expected range
141+
#[tokio::test]
142+
async fn test_generate_jitter() {
143+
let max_jitter = 100;
144+
let jitter = generate_jitter(max_jitter);
145+
assert!(jitter <= max_jitter);
146+
}
147+
148+
// Test to ensure retry_with_exponential_backoff succeeds on the first attempt
149+
#[tokio::test]
150+
async fn test_retry_with_exponential_backoff_success() {
151+
let policy = RetryPolicy {
152+
max_retries: 3,
153+
initial_delay_ms: 100,
154+
max_delay_ms: 1600,
155+
jitter_ms: 100,
156+
};
157+
158+
let result = retry_with_exponential_backoff(policy, "test_operation", || {
159+
Box::pin(async { Ok::<_, ()>("success") })
160+
}).await;
161+
162+
assert_eq!(result, Ok("success"));
163+
}
164+
165+
// Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds
166+
#[tokio::test]
167+
async fn test_retry_with_exponential_backoff_retries() {
168+
let policy = RetryPolicy {
169+
max_retries: 3,
170+
initial_delay_ms: 100,
171+
max_delay_ms: 1600,
172+
jitter_ms: 100,
173+
};
174+
175+
let attempts = AtomicUsize::new(0);
176+
177+
let result = retry_with_exponential_backoff(policy, "test_operation", || {
178+
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
179+
Box::pin(async move {
180+
if attempt < 2 {
181+
Err::<&str, &str>("error") // Fail the first two attempts
182+
} else {
183+
Ok::<&str, &str>("success") // Succeed on the third attempt
184+
}
185+
})
186+
}).await;
187+
188+
assert_eq!(result, Ok("success"));
189+
assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts
190+
}
191+
192+
// Test to ensure retry_with_exponential_backoff fails after max retries
193+
#[tokio::test]
194+
async fn test_retry_with_exponential_backoff_failure() {
195+
let policy = RetryPolicy {
196+
max_retries: 3,
197+
initial_delay_ms: 100,
198+
max_delay_ms: 1600,
199+
jitter_ms: 100,
200+
};
201+
202+
let attempts = AtomicUsize::new(0);
203+
204+
let result = retry_with_exponential_backoff(policy, "test_operation", || {
205+
attempts.fetch_add(1, Ordering::SeqCst);
206+
Box::pin(async { Err::<(), _>("error") }) // Always fail
207+
}).await;
208+
209+
assert_eq!(result, Err("error"));
210+
assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries)
211+
}
212+
213+
// Test to ensure retry_with_exponential_backoff respects the timeout
214+
#[tokio::test]
215+
async fn test_retry_with_exponential_backoff_timeout() {
216+
let policy = RetryPolicy {
217+
max_retries: 12, // Increase the number of retries
218+
initial_delay_ms: 100,
219+
max_delay_ms: 1600,
220+
jitter_ms: 100,
221+
};
222+
223+
let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || {
224+
Box::pin(async { Err::<(), _>("error") }) // Always fail
225+
})).await;
226+
227+
assert!(result.is_err()); // Ensure the operation times out
228+
}
229+
}

0 commit comments

Comments
 (0)