Skip to content

Commit db6ed67

Browse files
committed
handle failure hints
1 parent 18676e9 commit db6ed67

File tree

9 files changed

+1034
-148
lines changed

9 files changed

+1034
-148
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ temp-env = "0.3.6"
4343
thiserror = { version = "2", default-features = false }
4444
tonic = { version = "0.13", default-features = false }
4545
tonic-build = "0.13"
46+
tonic-types = "0.13"
4647
tokio = { version = "1", default-features = false }
4748
tokio-stream = "0.1"
4849
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,

opentelemetry-otlp/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true}
3535

3636
prost = { workspace = true, optional = true }
3737
tonic = { workspace = true, optional = true }
38+
tonic-types = { workspace = true, optional = true }
3839
tokio = { workspace = true, features = ["sync", "rt"], optional = true }
3940

4041
reqwest = { workspace = true, optional = true }
@@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"]
6970
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]
7071

7172
# grpc using tonic
72-
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
73+
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
7374
gzip-tonic = ["tonic/gzip"]
7475
zstd-tonic = ["tonic/zstd"]
7576

opentelemetry-otlp/allowed-external-types.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ allowed_external_types = [
1515
"tonic::transport::tls::Identity",
1616
"tonic::transport::channel::Channel",
1717
"tonic::service::interceptor::Interceptor",
18+
19+
# For retries
20+
"tonic::status::Status"
1821
]

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1414

1515
use super::BoxInterceptor;
1616

17-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
18-
use opentelemetry_sdk::runtime::Tokio;
17+
use crate::retry_classification::grpc::classify_tonic_status;
18+
use opentelemetry_sdk::retry::{RetryErrorType, RetryPolicy};
19+
use opentelemetry_sdk::runtime::{Runtime, Tokio};
1920

2021
pub(crate) struct TonicLogsClient {
2122
inner: Mutex<Option<ClientInner>>,
@@ -71,54 +72,91 @@ impl LogExporter for TonicLogsClient {
7172

7273
let batch = Arc::new(batch);
7374

74-
retry_with_exponential_backoff(Tokio, policy, "TonicLogsClient.Export", {
75-
let batch = Arc::clone(&batch);
76-
let inner = &self.inner;
77-
let resource = &self.resource;
78-
move || {
79-
let batch = Arc::clone(&batch);
80-
Box::pin(async move {
81-
let (mut client, metadata, extensions) = match inner.lock().await.as_mut() {
82-
Some(inner) => {
83-
let (m, e, _) = inner
84-
.interceptor
85-
.call(Request::new(()))
86-
.map_err(|e| {
87-
OTelSdkError::InternalFailure(format!("error: {e:?}"))
88-
})?
89-
.into_parts();
90-
(inner.client.clone(), m, e)
75+
// Custom retry loop that preserves tonic::Status for proper classification
76+
let mut attempt = 0;
77+
let mut delay = policy.initial_delay_ms;
78+
79+
loop {
80+
let batch_clone = Arc::clone(&batch);
81+
82+
// Execute the export operation
83+
let result = {
84+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
85+
Some(inner) => {
86+
let (m, e, _) = inner
87+
.interceptor
88+
.call(Request::new(()))
89+
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {e:?}")))?
90+
.into_parts();
91+
(inner.client.clone(), m, e)
92+
}
93+
None => return Err(OTelSdkError::AlreadyShutdown),
94+
};
95+
96+
let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource);
97+
98+
otel_debug!(name: "TonicLogsClient.ExportStarted");
99+
100+
client
101+
.export(Request::from_parts(
102+
metadata,
103+
extensions,
104+
ExportLogsServiceRequest { resource_logs },
105+
))
106+
.await
107+
};
108+
109+
match result {
110+
Ok(_) => {
111+
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
112+
return Ok(());
113+
}
114+
Err(tonic_status) => {
115+
// ✅ PROPER STRUCTURED ERROR HANDLING
116+
// Classify the tonic::Status directly with structured data
117+
let error_classification = classify_tonic_status(&tonic_status);
118+
119+
match error_classification {
120+
RetryErrorType::NonRetryable => {
121+
let error = format!("export error: {tonic_status:?}");
122+
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
123+
return Err(OTelSdkError::InternalFailure(error));
91124
}
92-
None => return Err(OTelSdkError::AlreadyShutdown),
93-
};
94-
95-
let resource_logs = group_logs_by_resource_and_scope(&batch, resource);
96-
97-
otel_debug!(name: "TonicLogsClient.ExportStarted");
98-
99-
let result = client
100-
.export(Request::from_parts(
101-
metadata,
102-
extensions,
103-
ExportLogsServiceRequest { resource_logs },
104-
))
105-
.await;
125+
RetryErrorType::Retryable if attempt < policy.max_retries => {
126+
attempt += 1;
127+
otel_debug!(name: "TonicLogsClient.ExportRetrying", attempt = attempt, error = format!("{tonic_status:?}"));
128+
129+
// Exponential backoff with jitter
130+
let jitter = (std::time::SystemTime::now()
131+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
132+
.unwrap()
133+
.subsec_nanos() as u64)
134+
% (policy.jitter_ms + 1);
135+
let delay_with_jitter =
136+
std::cmp::min(delay + jitter, policy.max_delay_ms);
137+
Tokio
138+
.delay(std::time::Duration::from_millis(delay_with_jitter))
139+
.await;
140+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
141+
}
142+
RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => {
143+
attempt += 1;
144+
otel_debug!(name: "TonicLogsClient.ExportThrottled", attempt = attempt, delay_ms = server_delay.as_millis());
106145

107-
match result {
108-
Ok(_) => {
109-
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
110-
Ok(())
146+
// Use server-specified delay
147+
Tokio.delay(server_delay).await;
111148
}
112-
Err(e) => {
113-
let error = format!("export error: {e:?}");
114-
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
115-
Err(OTelSdkError::InternalFailure(error))
149+
_ => {
150+
// Max retries reached
151+
let error =
152+
format!("export error after {attempt} attempts: {tonic_status:?}");
153+
otel_debug!(name: "TonicLogsClient.ExportFailedFinal", error = &error);
154+
return Err(OTelSdkError::InternalFailure(error));
116155
}
117156
}
118-
})
157+
}
119158
}
120-
})
121-
.await
159+
}
122160
}
123161

124162
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 91 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1212
use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

15-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
16-
use opentelemetry_sdk::runtime::Tokio;
15+
use crate::retry_classification::grpc::classify_tonic_status;
16+
use opentelemetry_sdk::retry::{RetryErrorType, RetryPolicy};
17+
use opentelemetry_sdk::runtime::{Runtime, Tokio};
1718

1819
pub(crate) struct TonicMetricsClient {
1920
inner: Mutex<Option<ClientInner>>,
@@ -63,56 +64,99 @@ impl MetricsClient for TonicMetricsClient {
6364
jitter_ms: 100,
6465
};
6566

66-
retry_with_exponential_backoff(Tokio, policy, "TonicMetricsClient.Export", {
67-
let inner = &self.inner;
68-
move || {
69-
Box::pin(async move {
70-
let (mut client, metadata, extensions) = inner
71-
.lock()
72-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
73-
.and_then(|mut inner| match &mut *inner {
74-
Some(inner) => {
75-
let (m, e, _) = inner
76-
.interceptor
77-
.call(Request::new(()))
78-
.map_err(|e| {
79-
OTelSdkError::InternalFailure(format!(
80-
"unexpected status while exporting {e:?}"
81-
))
82-
})?
83-
.into_parts();
84-
Ok((inner.client.clone(), m, e))
85-
}
86-
None => Err(OTelSdkError::InternalFailure(
87-
"exporter is already shut down".into(),
88-
)),
89-
})?;
90-
91-
otel_debug!(name: "TonicMetricsClient.ExportStarted");
92-
93-
let result = client
94-
.export(Request::from_parts(
95-
metadata,
96-
extensions,
97-
ExportMetricsServiceRequest::from(metrics),
98-
))
99-
.await;
100-
101-
match result {
102-
Ok(_) => {
103-
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
104-
Ok(())
67+
// Custom retry loop that preserves tonic::Status for proper classification
68+
let mut attempt = 0;
69+
let mut delay = policy.initial_delay_ms;
70+
71+
loop {
72+
// Execute the export operation
73+
let result = {
74+
let (mut client, metadata, extensions) = self
75+
.inner
76+
.lock()
77+
.map_err(|e| {
78+
OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}"))
79+
})
80+
.and_then(|mut inner| match &mut *inner {
81+
Some(inner) => {
82+
let (m, e, _) = inner
83+
.interceptor
84+
.call(Request::new(()))
85+
.map_err(|e| {
86+
OTelSdkError::InternalFailure(format!(
87+
"unexpected status while exporting {e:?}"
88+
))
89+
})?
90+
.into_parts();
91+
Ok((inner.client.clone(), m, e))
10592
}
106-
Err(e) => {
107-
let error = format!("export error: {e:?}");
93+
None => Err(OTelSdkError::InternalFailure(
94+
"exporter is already shut down".into(),
95+
)),
96+
})?;
97+
98+
otel_debug!(name: "TonicMetricsClient.ExportStarted");
99+
100+
client
101+
.export(Request::from_parts(
102+
metadata,
103+
extensions,
104+
ExportMetricsServiceRequest::from(metrics),
105+
))
106+
.await
107+
};
108+
109+
match result {
110+
Ok(_) => {
111+
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
112+
return Ok(());
113+
}
114+
Err(tonic_status) => {
115+
// ✅ PROPER STRUCTURED ERROR HANDLING
116+
// Classify the tonic::Status directly with structured data
117+
let error_classification = classify_tonic_status(&tonic_status);
118+
119+
match error_classification {
120+
RetryErrorType::NonRetryable => {
121+
let error = format!("export error: {tonic_status:?}");
108122
otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
109-
Err(OTelSdkError::InternalFailure(error))
123+
return Err(OTelSdkError::InternalFailure(error));
124+
}
125+
RetryErrorType::Retryable if attempt < policy.max_retries => {
126+
attempt += 1;
127+
otel_debug!(name: "TonicMetricsClient.ExportRetrying", attempt = attempt, error = format!("{tonic_status:?}"));
128+
129+
// Exponential backoff with jitter
130+
let jitter = (std::time::SystemTime::now()
131+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
132+
.unwrap()
133+
.subsec_nanos() as u64)
134+
% (policy.jitter_ms + 1);
135+
let delay_with_jitter =
136+
std::cmp::min(delay + jitter, policy.max_delay_ms);
137+
Tokio
138+
.delay(std::time::Duration::from_millis(delay_with_jitter))
139+
.await;
140+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
141+
}
142+
RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => {
143+
attempt += 1;
144+
otel_debug!(name: "TonicMetricsClient.ExportThrottled", attempt = attempt, delay_ms = server_delay.as_millis());
145+
146+
// Use server-specified delay
147+
Tokio.delay(server_delay).await;
148+
}
149+
_ => {
150+
// Max retries reached
151+
let error =
152+
format!("export error after {attempt} attempts: {tonic_status:?}");
153+
otel_debug!(name: "TonicMetricsClient.ExportFailedFinal", error = &error);
154+
return Err(OTelSdkError::InternalFailure(error));
110155
}
111156
}
112-
})
157+
}
113158
}
114-
})
115-
.await
159+
}
116160
}
117161

118162
fn shutdown(&self) -> OTelSdkResult {

0 commit comments

Comments
 (0)