Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
logs: LogBatch<'_>,
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource);

Check warning on line 325 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L325

Added line #L325 was not covered by tests
let req = ExportLogsServiceRequest { resource_logs };

match self.protocol {
Expand Down
66 changes: 43 additions & 23 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use core::fmt;
use opentelemetry::otel_debug;
use opentelemetry_proto::tonic::collector::logs::v1::{
Expand All @@ -12,6 +13,8 @@
use super::BoxInterceptor;
use tokio::sync::Mutex;

use crate::retry::{retry_with_exponential_backoff, RetryPolicy};

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
Expand Down Expand Up @@ -57,33 +60,50 @@

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(OTelSdkError::AlreadyShutdown),
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,

Check warning on line 67 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L63-L67

Added lines #L63 - L67 were not covered by tests
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let batch = Arc::new(batch); // Wrap batch in Arc<Mutex<LogBatch>>

retry_with_exponential_backoff(policy, "TonicLogsClient.Export", {
let batch = Arc::clone(&batch);
move || {
let batch = Arc::clone(&batch); // Clone the Arc inside the closure
Box::pin(async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)

Check warning on line 86 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L70-L86

Added lines #L70 - L86 were not covered by tests
}
None => return Err(OTelSdkError::AlreadyShutdown),

Check warning on line 88 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L88

Added line #L88 was not covered by tests
};

otel_debug!(name: "TonicsLogsClient.CallingExport");
let resource_logs = group_logs_by_resource_and_scope(&*batch, &self.resource);

Check warning on line 91 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L91

Added line #L91 was not covered by tests

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
Ok(())
otel_debug!(name: "TonicsLogsClient.CallingExport");

Check warning on line 93 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L93

Added line #L93 was not covered by tests

client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
))
.await
.map(|_| ()) // Map the successful result to Ok(())
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))
})
}
}).await

Check warning on line 106 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L95-L106

Added lines #L95 - L106 were not covered by tests
}

fn shutdown(&mut self) -> OTelSdkResult {
Expand Down
3 changes: 0 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,6 @@ mod tests {
#[test]
#[cfg(feature = "gzip-tonic")]
fn test_with_gzip_compression() {
// metadata should merge with the current one with priority instead of just replacing it
let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ mod metric;
#[cfg(feature = "trace")]
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
mod span;
mod retry;

pub use crate::exporter::Compression;
pub use crate::exporter::ExportConfig;
Expand Down
147 changes: 147 additions & 0 deletions opentelemetry-otlp/src/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::future::Future;
use std::time::{Duration, SystemTime};
use opentelemetry::otel_warn;
use tokio::time::sleep;
Copy link
Member

@lalitb lalitb Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio runtime won't be available here. Should we move this code under exporter/tonic, or if we want to keep it generic, one option could be to make the delay function configurable as an argument to retry_with_exponential_backoff - such that retry function can be callable from both async and blocking code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was the retry logic should be generic and used across all the OTLP exporters. Going to try that approach first and try to avoid the direct tokio dependency.


pub(crate) struct RetryPolicy {
pub max_retries: usize,
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub jitter_ms: u64,
}

// Generates a random jitter value up to max_jitter
fn generate_jitter(max_jitter: u64) -> u64 {
let now = SystemTime::now();
let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos();
nanos as u64 % (max_jitter + 1)
}

// Retries the given operation with exponential backoff and jitter
pub(crate) async fn retry_with_exponential_backoff<F, Fut, T, E>(
policy: RetryPolicy,
operation_name: &str,
mut operation: F,
) -> Result<T, E>
where
F: FnMut() -> Fut,
E: std::fmt::Debug,
Fut: Future<Output = Result<T, E>>,
{
let mut attempt = 0;
let mut delay = policy.initial_delay_ms;

loop {
match operation().await {
Ok(result) => return Ok(result), // Return the result if the operation succeeds
Err(err) if attempt < policy.max_retries => {
attempt += 1;
// Log the error and retry after a delay with jitter
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
otel_debug!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));

let jitter = generate_jitter(policy.jitter_ms);
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
sleep(Duration::from_millis(delay_with_jitter)).await;
delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff
}
Err(err) => return Err(err), // Return the error if max retries are reached
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

// Test to ensure generate_jitter returns a value within the expected range
#[tokio::test]
async fn test_generate_jitter() {
let max_jitter = 100;
let jitter = generate_jitter(max_jitter);
assert!(jitter <= max_jitter);
}

// Test to ensure retry_with_exponential_backoff succeeds on the first attempt
#[tokio::test]
async fn test_retry_with_exponential_backoff_success() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let result = retry_with_exponential_backoff(policy, "test_operation", || {
Box::pin(async { Ok::<_, ()>("success") })
}).await;

assert_eq!(result, Ok("success"));
}

// Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds
#[tokio::test]
async fn test_retry_with_exponential_backoff_retries() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let attempts = AtomicUsize::new(0);

let result = retry_with_exponential_backoff(policy, "test_operation", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 2 {
Err::<&str, &str>("error") // Fail the first two attempts
} else {
Ok::<&str, &str>("success") // Succeed on the third attempt
}
})
}).await;

assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts
}

// Test to ensure retry_with_exponential_backoff fails after max retries
#[tokio::test]
async fn test_retry_with_exponential_backoff_failure() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let attempts = AtomicUsize::new(0);

let result = retry_with_exponential_backoff(policy, "test_operation", || {
attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Err::<(), _>("error") }) // Always fail
}).await;

assert_eq!(result, Err("error"));
assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries)
}

// Test to ensure retry_with_exponential_backoff respects the timeout
#[tokio::test]
async fn test_retry_with_exponential_backoff_timeout() {
let policy = RetryPolicy {
max_retries: 12, // Increase the number of retries
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || {
Box::pin(async { Err::<(), _>("error") }) // Always fail
})).await;

assert!(result.is_err()); // Ensure the operation times out
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! Only a single test suite can run at once, as each container has statically mapped ports, but
//! this works nicely with the way cargo executes the suite.
//!
//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests
//! To skip integration tests with cargo, you can run `cargo test --lib`, which will run unit tests
//! only.
//!
#![cfg(unix)]
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ pub mod tonic {
}
}

pub fn group_logs_by_resource_and_scope(
logs: LogBatch<'_>,
pub fn group_logs_by_resource_and_scope<'a>(
logs: &'a LogBatch<'a>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
Expand Down Expand Up @@ -273,7 +273,7 @@ mod tests {
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand All @@ -293,7 +293,7 @@ mod tests {
let log_batch = LogBatch::new(&logs);
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand Down
Loading