Skip to content

Commit 3847b26

Browse files
committed
handle failure hints
1 parent 18676e9 commit 3847b26

File tree

9 files changed

+913
-156
lines changed

9 files changed

+913
-156
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ temp-env = "0.3.6"
4343
thiserror = { version = "2", default-features = false }
4444
tonic = { version = "0.13", default-features = false }
4545
tonic-build = "0.13"
46+
tonic-types = "0.13"
4647
tokio = { version = "1", default-features = false }
4748
tokio-stream = "0.1"
4849
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,

opentelemetry-otlp/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true}
3535

3636
prost = { workspace = true, optional = true }
3737
tonic = { workspace = true, optional = true }
38+
tonic-types = { workspace = true, optional = true }
3839
tokio = { workspace = true, features = ["sync", "rt"], optional = true }
3940

4041
reqwest = { workspace = true, optional = true }
@@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"]
6970
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]
7071

7172
# grpc using tonic
72-
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
73+
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
7374
gzip-tonic = ["tonic/gzip"]
7475
zstd-tonic = ["tonic/zstd"]
7576

opentelemetry-otlp/allowed-external-types.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ allowed_external_types = [
1515
"tonic::transport::tls::Identity",
1616
"tonic::transport::channel::Channel",
1717
"tonic::service::interceptor::Interceptor",
18+
19+
# For retries
20+
"tonic::status::Status"
1821
]

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1414

1515
use super::BoxInterceptor;
1616

17-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
17+
use crate::retry_classification::grpc::classify_tonic_status;
18+
use opentelemetry_sdk::retry::{RetryPolicy, retry_with_exponential_backoff_classified};
1819
use opentelemetry_sdk::runtime::Tokio;
1920

2021
pub(crate) struct TonicLogsClient {
@@ -71,54 +72,49 @@ impl LogExporter for TonicLogsClient {
7172

7273
let batch = Arc::new(batch);
7374

74-
retry_with_exponential_backoff(Tokio, policy, "TonicLogsClient.Export", {
75-
let batch = Arc::clone(&batch);
76-
let inner = &self.inner;
77-
let resource = &self.resource;
78-
move || {
79-
let batch = Arc::clone(&batch);
80-
Box::pin(async move {
81-
let (mut client, metadata, extensions) = match inner.lock().await.as_mut() {
82-
Some(inner) => {
83-
let (m, e, _) = inner
84-
.interceptor
85-
.call(Request::new(()))
86-
.map_err(|e| {
87-
OTelSdkError::InternalFailure(format!("error: {e:?}"))
88-
})?
89-
.into_parts();
90-
(inner.client.clone(), m, e)
91-
}
92-
None => return Err(OTelSdkError::AlreadyShutdown),
93-
};
94-
95-
let resource_logs = group_logs_by_resource_and_scope(&batch, resource);
96-
97-
otel_debug!(name: "TonicLogsClient.ExportStarted");
98-
99-
let result = client
100-
.export(Request::from_parts(
101-
metadata,
102-
extensions,
103-
ExportLogsServiceRequest { resource_logs },
104-
))
105-
.await;
106-
107-
match result {
108-
Ok(_) => {
109-
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
110-
Ok(())
111-
}
112-
Err(e) => {
113-
let error = format!("export error: {e:?}");
114-
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
115-
Err(OTelSdkError::InternalFailure(error))
116-
}
75+
match retry_with_exponential_backoff_classified(
76+
Tokio,
77+
policy,
78+
classify_tonic_status,
79+
"TonicLogsClient.Export",
80+
|| async {
81+
let batch_clone = Arc::clone(&batch);
82+
83+
// Execute the export operation
84+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
85+
Some(inner) => {
86+
let (m, e, _) = inner
87+
.interceptor
88+
.call(Request::new(()))
89+
.map_err(|e| {
90+
// Convert interceptor errors to tonic::Status for retry classification
91+
tonic::Status::internal(format!("interceptor error: {e:?}"))
92+
})?
93+
.into_parts();
94+
(inner.client.clone(), m, e)
11795
}
118-
})
96+
None => return Err(tonic::Status::failed_precondition("exporter already shutdown")),
97+
};
98+
99+
let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource);
100+
101+
otel_debug!(name: "TonicLogsClient.ExportStarted");
102+
103+
client
104+
.export(Request::from_parts(
105+
metadata,
106+
extensions,
107+
ExportLogsServiceRequest { resource_logs },
108+
))
109+
.await
110+
.map(|_| {
111+
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
112+
})
119113
}
120-
})
121-
.await
114+
).await {
115+
Ok(_) => Ok(()),
116+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!("export error: {tonic_status:?}"))),
117+
}
122118
}
123119

124120
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1212
use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

15-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
15+
use crate::retry_classification::grpc::classify_tonic_status;
16+
use opentelemetry_sdk::retry::{RetryPolicy, retry_with_exponential_backoff_classified};
1617
use opentelemetry_sdk::runtime::Tokio;
1718

1819
pub(crate) struct TonicMetricsClient {
@@ -63,56 +64,54 @@ impl MetricsClient for TonicMetricsClient {
6364
jitter_ms: 100,
6465
};
6566

66-
retry_with_exponential_backoff(Tokio, policy, "TonicMetricsClient.Export", {
67-
let inner = &self.inner;
68-
move || {
69-
Box::pin(async move {
70-
let (mut client, metadata, extensions) = inner
71-
.lock()
72-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
73-
.and_then(|mut inner| match &mut *inner {
74-
Some(inner) => {
75-
let (m, e, _) = inner
76-
.interceptor
77-
.call(Request::new(()))
78-
.map_err(|e| {
79-
OTelSdkError::InternalFailure(format!(
80-
"unexpected status while exporting {e:?}"
81-
))
82-
})?
83-
.into_parts();
84-
Ok((inner.client.clone(), m, e))
85-
}
86-
None => Err(OTelSdkError::InternalFailure(
87-
"exporter is already shut down".into(),
88-
)),
89-
})?;
90-
91-
otel_debug!(name: "TonicMetricsClient.ExportStarted");
92-
93-
let result = client
94-
.export(Request::from_parts(
95-
metadata,
96-
extensions,
97-
ExportMetricsServiceRequest::from(metrics),
98-
))
99-
.await;
100-
101-
match result {
102-
Ok(_) => {
103-
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
104-
Ok(())
67+
match retry_with_exponential_backoff_classified(
68+
Tokio,
69+
policy,
70+
classify_tonic_status,
71+
"TonicMetricsClient.Export",
72+
|| async {
73+
// Execute the export operation
74+
let (mut client, metadata, extensions) = self
75+
.inner
76+
.lock()
77+
.map_err(|e| {
78+
tonic::Status::internal(format!("Failed to acquire lock: {e:?}"))
79+
})
80+
.and_then(|mut inner| match &mut *inner {
81+
Some(inner) => {
82+
let (m, e, _) = inner
83+
.interceptor
84+
.call(Request::new(()))
85+
.map_err(|e| {
86+
tonic::Status::internal(format!(
87+
"unexpected status while exporting {e:?}"
88+
))
89+
})?
90+
.into_parts();
91+
Ok((inner.client.clone(), m, e))
10592
}
106-
Err(e) => {
107-
let error = format!("export error: {e:?}");
108-
otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
109-
Err(OTelSdkError::InternalFailure(error))
110-
}
111-
}
112-
})
93+
None => Err(tonic::Status::failed_precondition(
94+
"exporter is already shut down",
95+
)),
96+
})?;
97+
98+
otel_debug!(name: "TonicMetricsClient.ExportStarted");
99+
100+
client
101+
.export(Request::from_parts(
102+
metadata,
103+
extensions,
104+
ExportMetricsServiceRequest::from(metrics),
105+
))
106+
.await
107+
.map(|_| {
108+
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
109+
})
113110
}
114-
})
115-
.await
111+
).await {
112+
Ok(_) => Ok(()),
113+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!("export error: {tonic_status:?}"))),
114+
}
116115
}
117116

118117
fn shutdown(&self) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 46 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1616

1717
use super::BoxInterceptor;
1818

19-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
19+
use crate::retry_classification::grpc::classify_tonic_status;
20+
use opentelemetry_sdk::retry::{RetryPolicy, retry_with_exponential_backoff_classified};
2021
use opentelemetry_sdk::runtime::Tokio;
2122

2223
pub(crate) struct TonicTracesClient {
@@ -73,54 +74,52 @@ impl SpanExporter for TonicTracesClient {
7374

7475
let batch = Arc::new(batch);
7576

76-
retry_with_exponential_backoff(Tokio, policy, "TonicTracesClient.Export", {
77-
let batch = Arc::clone(&batch);
78-
let inner = &self.inner;
79-
let resource = &self.resource;
80-
move || {
81-
let batch = Arc::clone(&batch);
82-
Box::pin(async move {
83-
let (mut client, metadata, extensions) = match inner {
84-
Some(inner) => {
85-
let (m, e, _) = inner
86-
.interceptor
87-
.lock()
88-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
89-
.call(Request::new(()))
90-
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
91-
.into_parts();
92-
(inner.client.clone(), m, e)
93-
}
94-
None => return Err(OTelSdkError::AlreadyShutdown),
95-
};
96-
97-
let resource_spans = group_spans_by_resource_and_scope((*batch).clone(), resource);
98-
99-
otel_debug!(name: "TonicTracesClient.ExportStarted");
100-
101-
let result = client
102-
.export(Request::from_parts(
103-
metadata,
104-
extensions,
105-
ExportTraceServiceRequest { resource_spans },
106-
))
107-
.await;
108-
109-
match result {
110-
Ok(_) => {
111-
otel_debug!(name: "TonicTracesClient.ExportSucceeded");
112-
Ok(())
113-
}
114-
Err(e) => {
115-
let error = format!("export error: {e:?}");
116-
otel_debug!(name: "TonicTracesClient.ExportFailed", error = &error);
117-
Err(OTelSdkError::InternalFailure(error))
118-
}
77+
match retry_with_exponential_backoff_classified(
78+
Tokio,
79+
policy,
80+
classify_tonic_status,
81+
"TonicTracesClient.Export",
82+
|| async {
83+
let batch_clone = Arc::clone(&batch);
84+
85+
// Execute the export operation
86+
let (mut client, metadata, extensions) = match &self.inner {
87+
Some(inner) => {
88+
let (m, e, _) = inner
89+
.interceptor
90+
.lock()
91+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
92+
.call(Request::new(()))
93+
.map_err(|e| {
94+
// Convert interceptor errors to tonic::Status for retry classification
95+
tonic::Status::internal(format!("interceptor error: {e:?}"))
96+
})?
97+
.into_parts();
98+
(inner.client.clone(), m, e)
11999
}
120-
})
100+
None => return Err(tonic::Status::failed_precondition("exporter already shutdown")),
101+
};
102+
103+
let resource_spans =
104+
group_spans_by_resource_and_scope((*batch_clone).clone(), &self.resource);
105+
106+
otel_debug!(name: "TonicTracesClient.ExportStarted");
107+
108+
client
109+
.export(Request::from_parts(
110+
metadata,
111+
extensions,
112+
ExportTraceServiceRequest { resource_spans },
113+
))
114+
.await
115+
.map(|_| {
116+
otel_debug!(name: "TonicTracesClient.ExportSucceeded");
117+
})
121118
}
122-
})
123-
.await
119+
).await {
120+
Ok(_) => Ok(()),
121+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!("export error: {tonic_status:?}"))),
122+
}
124123
}
125124

126125
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,9 @@ mod metric;
366366
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
367367
mod span;
368368

369+
#[cfg(feature = "grpc-tonic")]
370+
pub mod retry_classification;
371+
369372
pub use crate::exporter::Compression;
370373
pub use crate::exporter::ExportConfig;
371374
pub use crate::exporter::ExporterBuildError;

0 commit comments

Comments
 (0)