Skip to content

Commit 4ae557e

Browse files
committed
chore: configurable retry policy with default
1 parent 07fd132 commit 4ae557e

File tree

7 files changed

+206
-44
lines changed

7 files changed

+206
-44
lines changed

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::time::Duration;
2525
#[cfg(feature = "http-retry")]
2626
use crate::retry_classification::http::classify_http_error;
2727
#[cfg(feature = "http-retry")]
28-
use opentelemetry_sdk::retry::RetryErrorType;
28+
use opentelemetry_sdk::retry::{RetryErrorType, RetryPolicy};
2929

3030
// Shared HTTP retry functionality
3131
/// HTTP-specific error wrapper for retry classification
@@ -111,6 +111,10 @@ pub struct HttpConfig {
111111

112112
/// The compression algorithm to use when communicating with the OTLP endpoint.
113113
compression: Option<crate::Compression>,
114+
115+
/// The retry policy to use for HTTP requests.
116+
#[cfg(feature = "http-retry")]
117+
retry_policy: Option<RetryPolicy>,
114118
}
115119

116120
/// Configuration for the OTLP HTTP exporter.
@@ -282,6 +286,8 @@ impl HttpExporterBuilder {
282286
self.exporter_config.protocol,
283287
timeout,
284288
compression,
289+
#[cfg(feature = "http-retry")]
290+
self.http_config.retry_policy.take(),
285291
))
286292
}
287293

@@ -361,6 +367,8 @@ pub(crate) struct OtlpHttpClient {
361367
protocol: Protocol,
362368
_timeout: Duration,
363369
compression: Option<crate::Compression>,
370+
#[cfg(feature = "http-retry")]
371+
retry_policy: RetryPolicy,
364372
#[allow(dead_code)]
365373
// <allow dead> would be removed once we support set_resource for metrics and traces.
366374
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
@@ -379,16 +387,9 @@ impl OtlpHttpClient {
379387
{
380388
#[cfg(feature = "http-retry")]
381389
{
382-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
390+
use opentelemetry_sdk::retry::retry_with_backoff;
383391
use opentelemetry_sdk::runtime::Tokio;
384392

385-
let policy = RetryPolicy {
386-
max_retries: 3,
387-
initial_delay_ms: 100,
388-
max_delay_ms: 1600,
389-
jitter_ms: 100,
390-
};
391-
392393
// Build request body once before retry loop
393394
let (body, content_type, content_encoding) = build_body_fn(self, data)
394395
.map_err(opentelemetry_sdk::error::OTelSdkError::InternalFailure)?;
@@ -401,7 +402,7 @@ impl OtlpHttpClient {
401402

402403
retry_with_backoff(
403404
Tokio,
404-
policy,
405+
self.retry_policy.clone(),
405406
classify_http_export_error,
406407
operation_name,
407408
|| async {
@@ -544,6 +545,7 @@ impl OtlpHttpClient {
544545
protocol: Protocol,
545546
timeout: Duration,
546547
compression: Option<crate::Compression>,
548+
#[cfg(feature = "http-retry")] retry_policy: Option<RetryPolicy>,
547549
) -> Self {
548550
OtlpHttpClient {
549551
client: Mutex::new(Some(client)),
@@ -552,6 +554,13 @@ impl OtlpHttpClient {
552554
protocol,
553555
_timeout: timeout,
554556
compression,
557+
#[cfg(feature = "http-retry")]
558+
retry_policy: retry_policy.unwrap_or(RetryPolicy {
559+
max_retries: 3,
560+
initial_delay_ms: 100,
561+
max_delay_ms: 1600,
562+
jitter_ms: 100,
563+
}),
555564
resource: ResourceAttributesWithSchema::default(),
556565
}
557566
}
@@ -722,6 +731,10 @@ pub trait WithHttpConfig {
722731

723732
/// Set the compression algorithm to use when communicating with the collector.
724733
fn with_compression(self, compression: crate::Compression) -> Self;
734+
735+
/// Set the retry policy for HTTP requests.
736+
#[cfg(feature = "http-retry")]
737+
fn with_retry_policy(self, policy: RetryPolicy) -> Self;
725738
}
726739

727740
impl<B: HasHttpConfig> WithHttpConfig for B {
@@ -746,6 +759,12 @@ impl<B: HasHttpConfig> WithHttpConfig for B {
746759
self.http_client_config().compression = Some(compression);
747760
self
748761
}
762+
763+
#[cfg(feature = "http-retry")]
764+
fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
765+
self.http_client_config().retry_policy = Some(policy);
766+
self
767+
}
749768
}
750769

751770
#[cfg(test)]
@@ -991,6 +1010,8 @@ mod tests {
9911010
client: None,
9921011
headers: Some(initial_headers),
9931012
compression: None,
1013+
#[cfg(feature = "http-retry")]
1014+
retry_policy: None,
9941015
},
9951016
exporter_config: crate::ExportConfig::default(),
9961017
};
@@ -1057,6 +1078,8 @@ mod tests {
10571078
crate::Protocol::HttpBinary,
10581079
std::time::Duration::from_secs(10),
10591080
Some(crate::Compression::Gzip),
1081+
#[cfg(feature = "http-retry")]
1082+
None,
10601083
);
10611084

10621085
// Test with some sample data
@@ -1088,6 +1111,8 @@ mod tests {
10881111
crate::Protocol::HttpBinary,
10891112
std::time::Duration::from_secs(10),
10901113
Some(crate::Compression::Zstd),
1114+
#[cfg(feature = "http-retry")]
1115+
None,
10911116
);
10921117

10931118
// Test with some sample data
@@ -1116,6 +1141,8 @@ mod tests {
11161141
crate::Protocol::HttpBinary,
11171142
std::time::Duration::from_secs(10),
11181143
None, // No compression
1144+
#[cfg(feature = "http-retry")]
1145+
None,
11191146
);
11201147

11211148
let body = vec![1, 2, 3, 4];
@@ -1137,6 +1164,8 @@ mod tests {
11371164
crate::Protocol::HttpBinary,
11381165
std::time::Duration::from_secs(10),
11391166
Some(crate::Compression::Gzip),
1167+
#[cfg(feature = "http-retry")]
1168+
None,
11401169
);
11411170

11421171
let body = vec![1, 2, 3, 4];
@@ -1159,6 +1188,8 @@ mod tests {
11591188
crate::Protocol::HttpBinary,
11601189
std::time::Duration::from_secs(10),
11611190
Some(crate::Compression::Zstd),
1191+
#[cfg(feature = "http-retry")]
1192+
None,
11621193
);
11631194

11641195
let body = vec![1, 2, 3, 4];
@@ -1221,6 +1252,8 @@ mod tests {
12211252
protocol,
12221253
std::time::Duration::from_secs(10),
12231254
compression,
1255+
#[cfg(feature = "http-retry")]
1256+
None,
12241257
)
12251258
}
12261259

@@ -1453,5 +1486,70 @@ mod tests {
14531486
Err(ExporterBuildError::UnsupportedCompressionAlgorithm(_))
14541487
));
14551488
}
1489+
1490+
#[cfg(feature = "http-retry")]
1491+
#[test]
1492+
fn test_with_retry_policy() {
1493+
use super::super::HttpExporterBuilder;
1494+
use crate::WithHttpConfig;
1495+
use opentelemetry_sdk::retry::RetryPolicy;
1496+
1497+
let custom_policy = RetryPolicy {
1498+
max_retries: 5,
1499+
initial_delay_ms: 200,
1500+
max_delay_ms: 3200,
1501+
jitter_ms: 50,
1502+
};
1503+
1504+
let builder = HttpExporterBuilder::default().with_retry_policy(custom_policy);
1505+
1506+
// Verify the retry policy was set
1507+
let retry_policy = builder.http_config.retry_policy.as_ref().unwrap();
1508+
assert_eq!(retry_policy.max_retries, 5);
1509+
assert_eq!(retry_policy.initial_delay_ms, 200);
1510+
assert_eq!(retry_policy.max_delay_ms, 3200);
1511+
assert_eq!(retry_policy.jitter_ms, 50);
1512+
}
1513+
1514+
#[cfg(feature = "http-retry")]
1515+
#[test]
1516+
fn test_default_retry_policy_when_none_configured() {
1517+
let client = create_test_client(crate::Protocol::HttpBinary, None);
1518+
1519+
// Verify default values are used
1520+
assert_eq!(client.retry_policy.max_retries, 3);
1521+
assert_eq!(client.retry_policy.initial_delay_ms, 100);
1522+
assert_eq!(client.retry_policy.max_delay_ms, 1600);
1523+
assert_eq!(client.retry_policy.jitter_ms, 100);
1524+
}
1525+
1526+
#[cfg(feature = "http-retry")]
1527+
#[test]
1528+
fn test_custom_retry_policy_used() {
1529+
use opentelemetry_sdk::retry::RetryPolicy;
1530+
1531+
let custom_policy = RetryPolicy {
1532+
max_retries: 7,
1533+
initial_delay_ms: 500,
1534+
max_delay_ms: 5000,
1535+
jitter_ms: 200,
1536+
};
1537+
1538+
let client = OtlpHttpClient::new(
1539+
std::sync::Arc::new(MockHttpClient),
1540+
"http://localhost:4318".parse().unwrap(),
1541+
HashMap::new(),
1542+
crate::Protocol::HttpBinary,
1543+
std::time::Duration::from_secs(10),
1544+
None,
1545+
Some(custom_policy),
1546+
);
1547+
1548+
// Verify custom values are used
1549+
assert_eq!(client.retry_policy.max_retries, 7);
1550+
assert_eq!(client.retry_policy.initial_delay_ms, 500);
1551+
assert_eq!(client.retry_policy.max_delay_ms, 5000);
1552+
assert_eq!(client.retry_policy.jitter_ms, 200);
1553+
}
14561554
}
14571555
}

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use opentelemetry_sdk::runtime::Tokio;
2020

2121
pub(crate) struct TonicLogsClient {
2222
inner: Mutex<Option<ClientInner>>,
23+
retry_policy: RetryPolicy,
2324
#[allow(dead_code)]
2425
// <allow dead> would be removed once we support set_resource for metrics.
2526
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
@@ -41,6 +42,7 @@ impl TonicLogsClient {
4142
channel: Channel,
4243
interceptor: BoxInterceptor,
4344
compression: Option<CompressionEncoding>,
45+
retry_policy: Option<RetryPolicy>,
4446
) -> Self {
4547
let mut client = LogsServiceClient::new(channel);
4648
if let Some(compression) = compression {
@@ -56,25 +58,24 @@ impl TonicLogsClient {
5658
client,
5759
interceptor,
5860
})),
61+
retry_policy: retry_policy.unwrap_or(RetryPolicy {
62+
max_retries: 3,
63+
initial_delay_ms: 100,
64+
max_delay_ms: 1600,
65+
jitter_ms: 100,
66+
}),
5967
resource: Default::default(),
6068
}
6169
}
6270
}
6371

6472
impl LogExporter for TonicLogsClient {
6573
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
66-
let policy = RetryPolicy {
67-
max_retries: 3,
68-
initial_delay_ms: 100,
69-
max_delay_ms: 1600,
70-
jitter_ms: 100,
71-
};
72-
7374
let batch = Arc::new(batch);
7475

7576
match retry_with_backoff(
7677
Tokio,
77-
policy,
78+
self.retry_policy.clone(),
7879
classify_tonic_status,
7980
"TonicLogsClient.Export",
8081
|| async {

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use opentelemetry_sdk::runtime::Tokio;
1818

1919
pub(crate) struct TonicMetricsClient {
2020
inner: Mutex<Option<ClientInner>>,
21+
retry_policy: RetryPolicy,
2122
}
2223

2324
struct ClientInner {
@@ -36,6 +37,7 @@ impl TonicMetricsClient {
3637
channel: Channel,
3738
interceptor: BoxInterceptor,
3839
compression: Option<CompressionEncoding>,
40+
retry_policy: Option<RetryPolicy>,
3941
) -> Self {
4042
let mut client = MetricsServiceClient::new(channel);
4143
if let Some(compression) = compression {
@@ -51,22 +53,21 @@ impl TonicMetricsClient {
5153
client,
5254
interceptor,
5355
})),
56+
retry_policy: retry_policy.unwrap_or(RetryPolicy {
57+
max_retries: 3,
58+
initial_delay_ms: 100,
59+
max_delay_ms: 1600,
60+
jitter_ms: 100,
61+
}),
5462
}
5563
}
5664
}
5765

5866
impl MetricsClient for TonicMetricsClient {
5967
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
60-
let policy = RetryPolicy {
61-
max_retries: 3,
62-
initial_delay_ms: 100,
63-
max_delay_ms: 1600,
64-
jitter_ms: 100,
65-
};
66-
6768
match retry_with_backoff(
6869
Tokio,
69-
policy,
70+
self.retry_policy.clone(),
7071
classify_tonic_status,
7172
"TonicMetricsClient.Export",
7273
|| async {

0 commit comments

Comments
 (0)