Skip to content

Commit cf56681

Browse files
committed
handle failure hints
1 parent 18676e9 commit cf56681

File tree

8 files changed

+931
-150
lines changed

8 files changed

+931
-150
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ temp-env = "0.3.6"
4343
thiserror = { version = "2", default-features = false }
4444
tonic = { version = "0.13", default-features = false }
4545
tonic-build = "0.13"
46+
tonic-types = "0.13"
4647
tokio = { version = "1", default-features = false }
4748
tokio-stream = "0.1"
4849
# Using `tracing 0.1.40` because 0.1.39 (which is yanked) introduces the ability to set event names in macros,

opentelemetry-otlp/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ tracing = {workspace = true, optional = true}
3535

3636
prost = { workspace = true, optional = true }
3737
tonic = { workspace = true, optional = true }
38+
tonic-types = { workspace = true, optional = true }
3839
tokio = { workspace = true, features = ["sync", "rt"], optional = true }
3940

4041
reqwest = { workspace = true, optional = true }
@@ -69,7 +70,7 @@ serialize = ["serde", "serde_json"]
6970
default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"]
7071

7172
# grpc using tonic
72-
grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
73+
grpc-tonic = ["tonic", "tonic-types", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic", "opentelemetry_sdk/rt-tokio", "opentelemetry_sdk/experimental_async_runtime"]
7374
gzip-tonic = ["tonic/gzip"]
7475
zstd-tonic = ["tonic/zstd"]
7576

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 80 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop
1414

1515
use super::BoxInterceptor;
1616

17-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
18-
use opentelemetry_sdk::runtime::Tokio;
17+
use opentelemetry_sdk::retry::{RetryPolicy, RetryErrorType};
18+
use opentelemetry_sdk::runtime::{Tokio, Runtime};
19+
use crate::retry_classification::grpc::classify_tonic_status;
1920

2021
pub(crate) struct TonicLogsClient {
2122
inner: Mutex<Option<ClientInner>>,
@@ -71,54 +72,88 @@ impl LogExporter for TonicLogsClient {
7172

7273
let batch = Arc::new(batch);
7374

74-
retry_with_exponential_backoff(Tokio, policy, "TonicLogsClient.Export", {
75-
let batch = Arc::clone(&batch);
76-
let inner = &self.inner;
77-
let resource = &self.resource;
78-
move || {
79-
let batch = Arc::clone(&batch);
80-
Box::pin(async move {
81-
let (mut client, metadata, extensions) = match inner.lock().await.as_mut() {
82-
Some(inner) => {
83-
let (m, e, _) = inner
84-
.interceptor
85-
.call(Request::new(()))
86-
.map_err(|e| {
87-
OTelSdkError::InternalFailure(format!("error: {e:?}"))
88-
})?
89-
.into_parts();
90-
(inner.client.clone(), m, e)
75+
// Custom retry loop that preserves tonic::Status for proper classification
76+
let mut attempt = 0;
77+
let mut delay = policy.initial_delay_ms;
78+
79+
loop {
80+
let batch_clone = Arc::clone(&batch);
81+
82+
// Execute the export operation
83+
let result = {
84+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
85+
Some(inner) => {
86+
let (m, e, _) = inner
87+
.interceptor
88+
.call(Request::new(()))
89+
.map_err(|e| {
90+
OTelSdkError::InternalFailure(format!("error: {e:?}"))
91+
})?
92+
.into_parts();
93+
(inner.client.clone(), m, e)
94+
}
95+
None => return Err(OTelSdkError::AlreadyShutdown),
96+
};
97+
98+
let resource_logs = group_logs_by_resource_and_scope(&batch_clone, &self.resource);
99+
100+
otel_debug!(name: "TonicLogsClient.ExportStarted");
101+
102+
client
103+
.export(Request::from_parts(
104+
metadata,
105+
extensions,
106+
ExportLogsServiceRequest { resource_logs },
107+
))
108+
.await
109+
};
110+
111+
match result {
112+
Ok(_) => {
113+
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
114+
return Ok(());
115+
}
116+
Err(tonic_status) => {
117+
// ✅ PROPER STRUCTURED ERROR HANDLING
118+
// Classify the tonic::Status directly with structured data
119+
let error_classification = classify_tonic_status(&tonic_status);
120+
121+
match error_classification {
122+
RetryErrorType::NonRetryable => {
123+
let error = format!("export error: {tonic_status:?}");
124+
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
125+
return Err(OTelSdkError::InternalFailure(error));
91126
}
92-
None => return Err(OTelSdkError::AlreadyShutdown),
93-
};
94-
95-
let resource_logs = group_logs_by_resource_and_scope(&batch, resource);
96-
97-
otel_debug!(name: "TonicLogsClient.ExportStarted");
98-
99-
let result = client
100-
.export(Request::from_parts(
101-
metadata,
102-
extensions,
103-
ExportLogsServiceRequest { resource_logs },
104-
))
105-
.await;
106-
107-
match result {
108-
Ok(_) => {
109-
otel_debug!(name: "TonicLogsClient.ExportSucceeded");
110-
Ok(())
127+
RetryErrorType::Retryable if attempt < policy.max_retries => {
128+
attempt += 1;
129+
otel_debug!(name: "TonicLogsClient.ExportRetrying", attempt = attempt, error = format!("{tonic_status:?}"));
130+
131+
// Exponential backoff with jitter
132+
let jitter = (std::time::SystemTime::now()
133+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
134+
.unwrap()
135+
.subsec_nanos() as u64) % (policy.jitter_ms + 1);
136+
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
137+
Tokio.delay(std::time::Duration::from_millis(delay_with_jitter)).await;
138+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
111139
}
112-
Err(e) => {
113-
let error = format!("export error: {e:?}");
114-
otel_debug!(name: "TonicLogsClient.ExportFailed", error = &error);
115-
Err(OTelSdkError::InternalFailure(error))
140+
RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => {
141+
attempt += 1;
142+
otel_debug!(name: "TonicLogsClient.ExportThrottled", attempt = attempt, delay_ms = server_delay.as_millis());
143+
144+
// Use server-specified delay
145+
Tokio.delay(server_delay).await;
146+
}
147+
_ => {
148+
// Max retries reached
149+
let error = format!("export error after {attempt} attempts: {tonic_status:?}");
150+
otel_debug!(name: "TonicLogsClient.ExportFailedFinal", error = &error);
151+
return Err(OTelSdkError::InternalFailure(error));
116152
}
117153
}
118-
})
154+
}
119155
}
120-
})
121-
.await
156+
}
122157
}
123158

124159
fn shutdown_with_timeout(&self, _timeout: time::Duration) -> OTelSdkResult {

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 83 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1212
use super::BoxInterceptor;
1313
use crate::metric::MetricsClient;
1414

15-
use opentelemetry_sdk::retry::{retry_with_exponential_backoff, RetryPolicy};
16-
use opentelemetry_sdk::runtime::Tokio;
15+
use opentelemetry_sdk::retry::{RetryPolicy, RetryErrorType};
16+
use opentelemetry_sdk::runtime::{Tokio, Runtime};
17+
use crate::retry_classification::grpc::classify_tonic_status;
1718

1819
pub(crate) struct TonicMetricsClient {
1920
inner: Mutex<Option<ClientInner>>,
@@ -63,56 +64,91 @@ impl MetricsClient for TonicMetricsClient {
6364
jitter_ms: 100,
6465
};
6566

66-
retry_with_exponential_backoff(Tokio, policy, "TonicMetricsClient.Export", {
67-
let inner = &self.inner;
68-
move || {
69-
Box::pin(async move {
70-
let (mut client, metadata, extensions) = inner
71-
.lock()
72-
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
73-
.and_then(|mut inner| match &mut *inner {
74-
Some(inner) => {
75-
let (m, e, _) = inner
76-
.interceptor
77-
.call(Request::new(()))
78-
.map_err(|e| {
79-
OTelSdkError::InternalFailure(format!(
80-
"unexpected status while exporting {e:?}"
81-
))
82-
})?
83-
.into_parts();
84-
Ok((inner.client.clone(), m, e))
85-
}
86-
None => Err(OTelSdkError::InternalFailure(
87-
"exporter is already shut down".into(),
88-
)),
89-
})?;
90-
91-
otel_debug!(name: "TonicMetricsClient.ExportStarted");
92-
93-
let result = client
94-
.export(Request::from_parts(
95-
metadata,
96-
extensions,
97-
ExportMetricsServiceRequest::from(metrics),
98-
))
99-
.await;
100-
101-
match result {
102-
Ok(_) => {
103-
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
104-
Ok(())
67+
// Custom retry loop that preserves tonic::Status for proper classification
68+
let mut attempt = 0;
69+
let mut delay = policy.initial_delay_ms;
70+
71+
loop {
72+
// Execute the export operation
73+
let result = {
74+
let (mut client, metadata, extensions) = self.inner
75+
.lock()
76+
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
77+
.and_then(|mut inner| match &mut *inner {
78+
Some(inner) => {
79+
let (m, e, _) = inner
80+
.interceptor
81+
.call(Request::new(()))
82+
.map_err(|e| {
83+
OTelSdkError::InternalFailure(format!(
84+
"unexpected status while exporting {e:?}"
85+
))
86+
})?
87+
.into_parts();
88+
Ok((inner.client.clone(), m, e))
10589
}
106-
Err(e) => {
107-
let error = format!("export error: {e:?}");
90+
None => Err(OTelSdkError::InternalFailure(
91+
"exporter is already shut down".into(),
92+
)),
93+
})?;
94+
95+
otel_debug!(name: "TonicMetricsClient.ExportStarted");
96+
97+
client
98+
.export(Request::from_parts(
99+
metadata,
100+
extensions,
101+
ExportMetricsServiceRequest::from(metrics),
102+
))
103+
.await
104+
};
105+
106+
match result {
107+
Ok(_) => {
108+
otel_debug!(name: "TonicMetricsClient.ExportSucceeded");
109+
return Ok(());
110+
}
111+
Err(tonic_status) => {
112+
// ✅ PROPER STRUCTURED ERROR HANDLING
113+
// Classify the tonic::Status directly with structured data
114+
let error_classification = classify_tonic_status(&tonic_status);
115+
116+
match error_classification {
117+
RetryErrorType::NonRetryable => {
118+
let error = format!("export error: {tonic_status:?}");
108119
otel_debug!(name: "TonicMetricsClient.ExportFailed", error = &error);
109-
Err(OTelSdkError::InternalFailure(error))
120+
return Err(OTelSdkError::InternalFailure(error));
121+
}
122+
RetryErrorType::Retryable if attempt < policy.max_retries => {
123+
attempt += 1;
124+
otel_debug!(name: "TonicMetricsClient.ExportRetrying", attempt = attempt, error = format!("{tonic_status:?}"));
125+
126+
// Exponential backoff with jitter
127+
let jitter = (std::time::SystemTime::now()
128+
.duration_since(std::time::SystemTime::UNIX_EPOCH)
129+
.unwrap()
130+
.subsec_nanos() as u64) % (policy.jitter_ms + 1);
131+
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
132+
Tokio.delay(std::time::Duration::from_millis(delay_with_jitter)).await;
133+
delay = std::cmp::min(delay * 2, policy.max_delay_ms);
134+
}
135+
RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => {
136+
attempt += 1;
137+
otel_debug!(name: "TonicMetricsClient.ExportThrottled", attempt = attempt, delay_ms = server_delay.as_millis());
138+
139+
// Use server-specified delay
140+
Tokio.delay(server_delay).await;
141+
}
142+
_ => {
143+
// Max retries reached
144+
let error = format!("export error after {attempt} attempts: {tonic_status:?}");
145+
otel_debug!(name: "TonicMetricsClient.ExportFailedFinal", error = &error);
146+
return Err(OTelSdkError::InternalFailure(error));
110147
}
111148
}
112-
})
149+
}
113150
}
114-
})
115-
.await
151+
}
116152
}
117153

118154
fn shutdown(&self) -> OTelSdkResult {

0 commit comments

Comments
 (0)