Skip to content

Commit fb141db

Browse files
committed
handle failure hints
1 parent 18676e9 commit fb141db

File tree

9 files changed

+929
-154
lines changed

9 files changed

+929
-154
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ temp-env = "0.3.6"
4343
thiserror = { version = "2", default-features = false }
4444
tonic = { version = "0.13", default-features = false }
4545
tonic-build = "0.13"
46+
tonic-types = "0.13"
4647
tokio = { version = "1", default-features = false }
4748
tokio-stream = "0.1"
4849
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,

opentelemetry-otlp/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true}
3535

3636
prost = { workspace = true, optional = true }
3737
tonic = { workspace = true, optional = true }
38+
tonic-types = { workspace = true, optional = true }
3839
tokio = { workspace = true, features = ["sync", "rt"], optional = true }
3940

4041
reqwest = { workspace = true, optional = true }
@@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"]
6970
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]
7071

7172
# grpc using tonic
72-
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
73+
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
7374
gzip-tonic = ["tonic/gzip"]
7475
zstd-tonic = ["tonic/zstd"]
7576

opentelemetry-otlp/allowed-external-types.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ allowed_external_types = [
1515
"tonic::transport::tls::Identity",
1616
"tonic::transport::channel::Channel",
1717
"tonic::service::interceptor::Interceptor",
18+
19+
# For retries
20+
"tonic::status::Status"
1821
]

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1414

1515
use super::BoxInterceptor;
1616

17-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
17+
use crate::retry_classification::grpc::classify_tonic_status;
18+
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
1819
use opentelemetry_sdk::runtime::Tokio;
1920

2021
pub(crate) struct TonicLogsClient {
@@ -71,54 +72,57 @@ impl LogExporter for TonicLogsClient {
7172

7273
let batch = Arc::new(batch);
7374

74-
retry_with_exponential_backoff(Tokio, policy, "TonicLogsClient.Export", {
75-
let batch = Arc::clone(&batch);
76-
let inner = &self.inner;
77-
let resource = &self.resource;
78-
move || {
79-
let batch = Arc::clone(&batch);
80-
Box::pin(async move {
81-
let (mut client, metadata, extensions) = match inner.lock().await.as_mut() {
82-
Some(inner) => {
83-
let (m, e, _) = inner
84-
.interceptor
85-
.call(Request::new(()))
86-
.map_err(|e| {
87-
OTelSdkError::InternalFailure(format!("error: {e:?}"))
88-
})?
89-
.into_parts();
90-
(inner.client.clone(), m, e)
91-
}
92-
None => return Err(OTelSdkError::AlreadyShutdown),
93-
};
94-
95-
let resource_logs = group_logs_by_resource_and_scope(&batch, resource);
96-
97-
otel_debug!(name: "TonicLogsClient.ExportStarted");
98-
99-
let result = client
100-
.export(Request::from_parts(
101-
metadata,
102-
extensions,
103-
ExportLogsServiceRequest { resource_logs },
75+
match retry_with_exponential_backoff_classified(
76+
Tokio,
77+
policy,
78+
classify_tonic_status,
79+
"TonicLogsClient.Export",
80+
|| async {
81+
let batch_clone = Arc::clone(&batch);
82+
83+
// Execute the export operation
84+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
85+
Some(inner) => {
86+
let (m, e, _) = inner
87+
.interceptor
88+
.call(Request::new(()))
89+
.map_err(|e| {
90+
// Convert interceptor errors to tonic::Status for retry classification
91+
tonic::Status::internal(format!("interceptor error: {e:?}"))
92+
})?
93+
.into_parts();
94+
(inner.client.clone(), m, e)
95+
}
96+
None => {
97+
return Err(tonic::Status::failed_precondition(
98+
"exporter already shutdown",
10499
))
105-
.await;
106-
107-
match result {
108-
Ok(_) => {
109-
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
110-
Ok(())
111-
}
112-
Err(e) => {
113-
let error = format!("export error: {e:?}");
114-
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
115-
Err(OTelSdkError::InternalFailure(error))
116-
}
117100
}
118-
})
119-
}
120-
})
101+
};
102+
103+
let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource);
104+
105+
otel_debug!(name: "TonicLogsClient.ExportStarted");
106+
107+
client
108+
.export(Request::from_parts(
109+
metadata,
110+
extensions,
111+
ExportLogsServiceRequest { resource_logs },
112+
))
113+
.await
114+
.map(|_| {
115+
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
116+
})
117+
},
118+
)
121119
.await
120+
{
121+
Ok(_) => Ok(()),
122+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!(
123+
"export error: {tonic_status:?}"
124+
))),
125+
}
122126
}
123127

124128
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1212
use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

15-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
15+
use crate::retry_classification::grpc::classify_tonic_status;
16+
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
1617
use opentelemetry_sdk::runtime::Tokio;
1718

1819
pub(crate) struct TonicMetricsClient {
@@ -63,56 +64,56 @@ impl MetricsClient for TonicMetricsClient {
6364
jitter_ms: 100,
6465
};
6566

66-
retry_with_exponential_backoff(Tokio, policy, "TonicMetricsClient.Export", {
67-
let inner = &self.inner;
68-
move || {
69-
Box::pin(async move {
70-
let (mut client, metadata, extensions) = inner
71-
.lock()
72-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
73-
.and_then(|mut inner| match &mut *inner {
74-
Some(inner) => {
75-
let (m, e, _) = inner
76-
.interceptor
77-
.call(Request::new(()))
78-
.map_err(|e| {
79-
OTelSdkError::InternalFailure(format!(
80-
"unexpected status while exporting {e:?}"
81-
))
82-
})?
83-
.into_parts();
84-
Ok((inner.client.clone(), m, e))
85-
}
86-
None => Err(OTelSdkError::InternalFailure(
87-
"exporter is already shut down".into(),
88-
)),
89-
})?;
90-
91-
otel_debug!(name: "TonicMetricsClient.ExportStarted");
92-
93-
let result = client
94-
.export(Request::from_parts(
95-
metadata,
96-
extensions,
97-
ExportMetricsServiceRequest::from(metrics),
98-
))
99-
.await;
100-
101-
match result {
102-
Ok(_) => {
103-
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
104-
Ok(())
67+
match retry_with_exponential_backoff_classified(
68+
Tokio,
69+
policy,
70+
classify_tonic_status,
71+
"TonicMetricsClient.Export",
72+
|| async {
73+
// Execute the export operation
74+
let (mut client, metadata, extensions) = self
75+
.inner
76+
.lock()
77+
.map_err(|e| tonic::Status::internal(format!("Failed to acquire lock: {e:?}")))
78+
.and_then(|mut inner| match &mut *inner {
79+
Some(inner) => {
80+
let (m, e, _) = inner
81+
.interceptor
82+
.call(Request::new(()))
83+
.map_err(|e| {
84+
tonic::Status::internal(format!(
85+
"unexpected status while exporting {e:?}"
86+
))
87+
})?
88+
.into_parts();
89+
Ok((inner.client.clone(), m, e))
10590
}
106-
Err(e) => {
107-
let error = format!("export error: {e:?}");
108-
otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
109-
Err(OTelSdkError::InternalFailure(error))
110-
}
111-
}
112-
})
113-
}
114-
})
91+
None => Err(tonic::Status::failed_precondition(
92+
"exporter is already shut down",
93+
)),
94+
})?;
95+
96+
otel_debug!(name: "TonicMetricsClient.ExportStarted");
97+
98+
client
99+
.export(Request::from_parts(
100+
metadata,
101+
extensions,
102+
ExportMetricsServiceRequest::from(metrics),
103+
))
104+
.await
105+
.map(|_| {
106+
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
107+
})
108+
},
109+
)
115110
.await
111+
{
112+
Ok(_) => Ok(()),
113+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!(
114+
"export error: {tonic_status:?}"
115+
))),
116+
}
116117
}
117118

118119
fn shutdown(&self) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/trace.rs

Lines changed: 53 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1616

1717
use super::BoxInterceptor;
1818

19-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
19+
use crate::retry_classification::grpc::classify_tonic_status;
20+
use opentelemetry_sdk::retry::{retry_with_exponential_backoff_classified, RetryPolicy};
2021
use opentelemetry_sdk::runtime::Tokio;
2122

2223
pub(crate) struct TonicTracesClient {
@@ -73,54 +74,60 @@ impl SpanExporter for TonicTracesClient {
7374

7475
let batch = Arc::new(batch);
7576

76-
retry_with_exponential_backoff(Tokio, policy, "TonicTracesClient.Export", {
77-
let batch = Arc::clone(&batch);
78-
let inner = &self.inner;
79-
let resource = &self.resource;
80-
move || {
81-
let batch = Arc::clone(&batch);
82-
Box::pin(async move {
83-
let (mut client, metadata, extensions) = match inner {
84-
Some(inner) => {
85-
let (m, e, _) = inner
86-
.interceptor
87-
.lock()
88-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
89-
.call(Request::new(()))
90-
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
91-
.into_parts();
92-
(inner.client.clone(), m, e)
93-
}
94-
None => return Err(OTelSdkError::AlreadyShutdown),
95-
};
96-
97-
let resource_spans = group_spans_by_resource_and_scope((*batch).clone(), resource);
98-
99-
otel_debug!(name: "TonicTracesClient.ExportStarted");
100-
101-
let result = client
102-
.export(Request::from_parts(
103-
metadata,
104-
extensions,
105-
ExportTraceServiceRequest { resource_spans },
77+
match retry_with_exponential_backoff_classified(
78+
Tokio,
79+
policy,
80+
classify_tonic_status,
81+
"TonicTracesClient.Export",
82+
|| async {
83+
let batch_clone = Arc::clone(&batch);
84+
85+
// Execute the export operation
86+
let (mut client, metadata, extensions) = match &self.inner {
87+
Some(inner) => {
88+
let (m, e, _) = inner
89+
.interceptor
90+
.lock()
91+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
92+
.call(Request::new(()))
93+
.map_err(|e| {
94+
// Convert interceptor errors to tonic::Status for retry classification
95+
tonic::Status::internal(format!("interceptor error: {e:?}"))
96+
})?
97+
.into_parts();
98+
(inner.client.clone(), m, e)
99+
}
100+
None => {
101+
return Err(tonic::Status::failed_precondition(
102+
"exporter already shutdown",
106103
))
107-
.await;
108-
109-
match result {
110-
Ok(_) => {
111-
otel_debug!(name: "TonicTracesClient.ExportSucceeded");
112-
Ok(())
113-
}
114-
Err(e) => {
115-
let error = format!("export error: {e:?}");
116-
otel_debug!(name: "TonicTracesClient.ExportFailed", error = &error);
117-
Err(OTelSdkError::InternalFailure(error))
118-
}
119104
}
120-
})
121-
}
122-
})
105+
};
106+
107+
let resource_spans =
108+
group_spans_by_resource_and_scope((*batch_clone).clone(), &self.resource);
109+
110+
otel_debug!(name: "TonicTracesClient.ExportStarted");
111+
112+
client
113+
.export(Request::from_parts(
114+
metadata,
115+
extensions,
116+
ExportTraceServiceRequest { resource_spans },
117+
))
118+
.await
119+
.map(|_| {
120+
otel_debug!(name: "TonicTracesClient.ExportSucceeded");
121+
})
122+
},
123+
)
123124
.await
125+
{
126+
Ok(_) => Ok(()),
127+
Err(tonic_status) => Err(OTelSdkError::InternalFailure(format!(
128+
"export error: {tonic_status:?}"
129+
))),
130+
}
124131
}
125132

126133
fn shutdown(&mut self) -> OTelSdkResult {

opentelemetry-otlp/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,9 @@ mod metric;
366366
#[cfg(any(feature = "http-proto", feature = "http-json", feature = "grpc-tonic"))]
367367
mod span;
368368

369+
#[cfg(feature = "grpc-tonic")]
370+
pub mod retry_classification;
371+
369372
pub use crate::exporter::Compression;
370373
pub use crate::exporter::ExportConfig;
371374
pub use crate::exporter::ExporterBuildError;

0 commit comments

Comments
 (0)