Skip to content

Commit c0dba75

Browse files
committed
chore: modify tonic exporters so we don't _need_ the async runtime if we don't want retry
1 parent 225e26a commit c0dba75

File tree

5 files changed

+64
-24
lines changed

5 files changed

+64
-24
lines changed

opentelemetry-otlp/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ serialize = ["serde", "serde_json"]
7070
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]
7171

7272
# grpc using tonic
73-
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
73+
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"]
7474
gzip-tonic = ["tonic/gzip"]
7575
zstd-tonic = ["tonic/zstd"]
7676

77+
# grpc with retry support
78+
grpc-retry = ["grpc-tonic", "opentelemetry_sdk/experimental_async_runtime"]
79+
7780
# http compression
7881
gzip-http = ["flate2"]
7982
zstd-http = ["zstd"]

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1414

1515
use super::BoxInterceptor;
1616

17-
use crate::retry_classification::grpc::classify_tonic_status;
18-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
17+
use opentelemetry_sdk::retry::RetryPolicy;
1918
use opentelemetry_sdk::runtime::Tokio;
2019

2120
pub(crate) struct TonicLogsClient {
@@ -73,10 +72,10 @@ impl LogExporter for TonicLogsClient {
7372
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
7473
let batch = Arc::new(batch);
7574

76-
match retry_with_backoff(
75+
match super::tonic_retry_with_backoff(
7776
Tokio,
7877
self.retry_policy.clone(),
79-
classify_tonic_status,
78+
crate::retry_classification::grpc::classify_tonic_status,
8079
"TonicLogsClient.Export",
8180
|| async {
8281
let batch_clone = Arc::clone(&batch);

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1212
use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

15-
use crate::retry_classification::grpc::classify_tonic_status;
16-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
15+
use opentelemetry_sdk::retry::RetryPolicy;
1716
use opentelemetry_sdk::runtime::Tokio;
1817

1918
pub(crate) struct TonicMetricsClient {
@@ -65,10 +64,10 @@ impl TonicMetricsClient {
6564

6665
impl MetricsClient for TonicMetricsClient {
6766
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
68-
match retry_with_backoff(
67+
match super::tonic_retry_with_backoff(
6968
Tokio,
7069
self.retry_policy.clone(),
71-
classify_tonic_status,
70+
crate::retry_classification::grpc::classify_tonic_status,
7271
"TonicMetricsClient.Export",
7372
|| async {
7473
// Execute the export operation

opentelemetry-otlp/src/exporter/tonic/mod.rs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ use crate::{ExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_HEADER
1818

1919
#[cfg(feature = "grpc-tonic")]
2020
use opentelemetry_sdk::retry::RetryPolicy;
21+
#[cfg(feature = "grpc-retry")]
22+
use opentelemetry_sdk::retry::retry_with_backoff;
23+
#[cfg(feature = "grpc-tonic")]
24+
use opentelemetry_sdk::runtime::Runtime;
25+
#[cfg(feature = "grpc-tonic")]
26+
use std::future::Future;
2127

2228
#[cfg(feature = "logs")]
2329
pub(crate) mod logs;
@@ -44,7 +50,7 @@ pub struct TonicConfig {
4450
pub(crate) channel: Option<tonic::transport::Channel>,
4551
pub(crate) interceptor: Option<BoxInterceptor>,
4652
/// The retry policy to use for gRPC requests.
47-
#[cfg(feature = "grpc-tonic")]
53+
#[cfg(feature = "grpc-retry")]
4854
pub(crate) retry_policy: Option<RetryPolicy>,
4955
}
5056

@@ -138,7 +144,7 @@ impl Default for TonicExporterBuilder {
138144
compression: None,
139145
channel: Option::default(),
140146
interceptor: Option::default(),
141-
#[cfg(feature = "grpc-tonic")]
147+
#[cfg(feature = "grpc-retry")]
142148
retry_policy: None,
143149
},
144150
exporter_config: ExportConfig {
@@ -190,14 +196,14 @@ impl TonicExporterBuilder {
190196
};
191197

192198
// Get retry policy before consuming self
193-
#[cfg(feature = "grpc-tonic")]
199+
#[cfg(feature = "grpc-retry")]
194200
let retry_policy = self.tonic_config.retry_policy.clone();
195201

196202
// If a custom channel was provided, use that channel instead of creating one
197203
if let Some(channel) = self.tonic_config.channel {
198204
return Ok((channel, interceptor, compression,
199-
#[cfg(feature = "grpc-tonic")] retry_policy,
200-
#[cfg(not(feature = "grpc-tonic"))] None));
205+
#[cfg(feature = "grpc-retry")] retry_policy,
206+
#[cfg(not(feature = "grpc-retry"))] None));
201207
}
202208

203209
let config = self.exporter_config;
@@ -226,8 +232,8 @@ impl TonicExporterBuilder {
226232

227233
otel_debug!(name: "TonicChannelBuilt", endpoint = endpoint_clone, timeout_in_millisecs = timeout.as_millis(), compression = format!("{:?}", compression), headers = format!("{:?}", headers_for_logging));
228234
Ok((channel, interceptor, compression,
229-
#[cfg(feature = "grpc-tonic")] retry_policy,
230-
#[cfg(not(feature = "grpc-tonic"))] None))
235+
#[cfg(feature = "grpc-retry")] retry_policy,
236+
#[cfg(not(feature = "grpc-retry"))] None))
231237
}
232238

233239
fn resolve_endpoint(default_endpoint_var: &str, provided_endpoint: Option<String>) -> String {
@@ -320,6 +326,40 @@ impl TonicExporterBuilder {
320326
}
321327
}
322328

329+
/// Wrapper for retry functionality in tonic exporters.
330+
/// Provides a unified call path that either uses retry_with_backoff when grpc-retry
331+
/// feature is enabled, or executes the operation once when it's not.
332+
#[cfg(feature = "grpc-tonic")]
333+
async fn tonic_retry_with_backoff<R, F, Fut, T>(
334+
runtime: R,
335+
policy: RetryPolicy,
336+
classify_fn: fn(&tonic::Status) -> opentelemetry_sdk::retry::RetryErrorType,
337+
operation_name: &'static str,
338+
operation: F,
339+
) -> Result<T, tonic::Status>
340+
where
341+
R: Runtime,
342+
F: Fn() -> Fut,
343+
Fut: Future<Output = Result<T, tonic::Status>>,
344+
{
345+
#[cfg(feature = "grpc-retry")]
346+
{
347+
retry_with_backoff(runtime, policy, classify_fn, operation_name, operation).await
348+
}
349+
350+
#[cfg(not(feature = "grpc-retry"))]
351+
{
352+
// When retry feature is not enabled, execute operation once
353+
// avoid unused param warnings ...
354+
let _ = runtime;
355+
let _ = policy;
356+
let _ = classify_fn;
357+
let _ = operation_name;
358+
359+
operation().await
360+
}
361+
}
362+
323363
fn merge_metadata_with_headers_from_env(
324364
metadata: MetadataMap,
325365
headers_from_env: HeaderMap,
@@ -493,7 +533,7 @@ pub trait WithTonicConfig {
493533
I: tonic::service::Interceptor + Clone + Send + Sync + 'static;
494534

495535
/// Set the retry policy for gRPC requests.
496-
#[cfg(feature = "grpc-tonic")]
536+
#[cfg(feature = "grpc-retry")]
497537
fn with_retry_policy(self, policy: RetryPolicy) -> Self;
498538
}
499539

@@ -537,7 +577,7 @@ impl<B: HasTonicConfig> WithTonicConfig for B {
537577
self
538578
}
539579

540-
#[cfg(feature = "grpc-tonic")]
580+
#[cfg(feature = "grpc-retry")]
541581
fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
542582
self.tonic_config().retry_policy = Some(policy);
543583
self
@@ -775,7 +815,7 @@ mod tests {
775815
});
776816
}
777817

778-
#[cfg(feature = "grpc-tonic")]
818+
#[cfg(feature = "grpc-retry")]
779819
#[test]
780820
fn test_with_retry_policy() {
781821
use crate::WithTonicConfig;
@@ -798,7 +838,7 @@ mod tests {
798838
assert_eq!(retry_policy.jitter_ms, 50);
799839
}
800840

801-
#[cfg(feature = "grpc-tonic")]
841+
#[cfg(feature = "grpc-retry")]
802842
#[test]
803843
fn test_default_retry_policy_when_none_configured() {
804844
// This test requires us to create a tonic client, but we can't easily do that without

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1616

1717
use super::BoxInterceptor;
1818

19-
use crate::retry_classification::grpc::classify_tonic_status;
20-
use opentelemetry_sdk::retry::{retry_with_backoff, RetryPolicy};
19+
use opentelemetry_sdk::retry::RetryPolicy;
2120
use opentelemetry_sdk::runtime::Tokio;
2221

2322
pub(crate) struct TonicTracesClient {
@@ -75,10 +74,10 @@ impl SpanExporter for TonicTracesClient {
7574
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
7675
let batch = Arc::new(batch);
7776

78-
match retry_with_backoff(
77+
match super::tonic_retry_with_backoff(
7978
Tokio,
8079
self.retry_policy.clone(),
81-
classify_tonic_status,
80+
crate::retry_classification::grpc::classify_tonic_status,
8281
"TonicTracesClient.Export",
8382
|| async {
8483
let batch_clone = Arc::clone(&batch);

0 commit comments

Comments
 (0)