Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ url = "2.2"
md5 = "0.8.0"
hex = "0.4"
lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = false }
tokio = { version = "1", default-features = false, features = ["time"] } # Needed for tokio::time::sleep in library code

[features]
self_signed_certs = [] # Empty by default for security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub(crate) enum GenevaConfigClientError {
Http(#[from] reqwest::Error),
#[error("Request failed with status {status}: {message}")]
RequestFailed { status: u16, message: String },
#[error("Request failed with status {status}: {message} (Retry-After={retry_after_secs:?})")]
RequestFailedWithRetryAfter {
status: u16,
message: String,
retry_after_secs: Option<u64>,
},

// Data / parsing
#[error("JSON error: {0}")]
Expand All @@ -93,6 +99,13 @@ pub(crate) enum GenevaConfigClientError {
#[allow(dead_code)]
pub(crate) type Result<T> = std::result::Result<T, GenevaConfigClientError>;

const MAX_GCS_RETRIES: usize = 5;
const FIXED_BACKOFF: [Duration; 3] = [
Duration::from_millis(200),
Duration::from_millis(400),
Duration::from_millis(800),
];

/// Configuration for the Geneva Config Client.
///
/// # Fields
Expand Down Expand Up @@ -370,10 +383,38 @@ impl GenevaConfigClient {
}
}
}
// Cache miss or expired token, fetch fresh data
// Perform actual fetch before acquiring write lock to minimize lock contention
let (fresh_ingestion_gateway_info, fresh_moniker_info) =
self.fetch_ingestion_info().await?;
// Cache miss or expired token, fetch fresh data with retry
let (fresh_ingestion_gateway_info, fresh_moniker_info) = {
let mut last_error = None;
let mut result = None;
for attempt in 0..=MAX_GCS_RETRIES {
match self.fetch_ingestion_info().await {
Ok(res) => {
result = Some(res);
break;
}
Err(e) => {
last_error = Some(e);
if attempt < MAX_GCS_RETRIES
&& is_retriable_error(last_error.as_ref().unwrap())
{
let delay = match last_error.as_ref().unwrap() {
GenevaConfigClientError::RequestFailedWithRetryAfter {
retry_after_secs: Some(s),
..
} => Duration::from_secs(*s),
_ => {
let idx = std::cmp::min(attempt, FIXED_BACKOFF.len() - 1);
FIXED_BACKOFF[idx]
}
};
tokio::time::sleep(delay).await;
}
}
}
}
result.ok_or_else(|| last_error.unwrap())?
};

let token_expiry =
Self::parse_token_expiry(&fresh_ingestion_gateway_info.auth_token_expiry_time)
Expand Down Expand Up @@ -438,6 +479,18 @@ impl GenevaConfigClient {
.map_err(GenevaConfigClientError::Http)?;
// Check if the response is successful
let status = response.status();
// Capture Retry-After before consuming body
let retry_after_header = if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
response
.headers()
.get("Retry-After")
.and_then(|hv| hv.to_str().ok())
.map(|s| s.trim().to_string())
} else {
None
};
let body = response.text().await?;
if status.is_success() {
let parsed = match serde_json::from_str::<GenevaResponse>(&body) {
Expand All @@ -463,6 +516,17 @@ impl GenevaConfigClient {
Err(GenevaConfigClientError::MonikerNotFound(
"No primary diag moniker found in storage accounts".to_string(),
))
} else if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
let retry_after_secs = retry_after_header
.and_then(|s| s.parse::<u64>().ok())
.filter(|v| *v >= 1 && *v <= 3);
Err(GenevaConfigClientError::RequestFailedWithRetryAfter {
status: status.as_u16(),
message: body,
retry_after_secs,
})
} else {
Err(GenevaConfigClientError::RequestFailed {
status: status.as_u16(),
Expand Down Expand Up @@ -533,6 +597,17 @@ fn extract_endpoint_from_token(token: &str) -> Result<String> {
Ok(endpoint)
}

fn is_retriable_error(err: &GenevaConfigClientError) -> bool {
match err {
GenevaConfigClientError::Http(e) => e.is_timeout() || e.is_connect(),
GenevaConfigClientError::RequestFailed { status, .. }
| GenevaConfigClientError::RequestFailedWithRetryAfter { status, .. } => {
(*status >= 500 && *status < 600) || *status == 429
}
_ => false,
}
}

#[cfg(feature = "self_signed_certs")]
fn configure_tls_connector(
mut builder: native_tls::TlsConnectorBuilder,
Expand All @@ -558,3 +633,46 @@ fn configure_tls_connector(
.max_protocol_version(Some(Protocol::Tlsv12));
builder
}

#[cfg(test)]
mod retry_tests {
use super::*;

#[test]
fn test_is_retriable_request_failed_statuses() {
// 500 range
for status in [500u16, 502, 503, 504, 599] {
let err = GenevaConfigClientError::RequestFailed {
status,
message: String::new(),
};
assert!(
super::is_retriable_error(&err),
"status {status} should be retriable"
);
}
// 429
let err_429 = GenevaConfigClientError::RequestFailed {
status: 429,
message: String::new(),
};
assert!(
super::is_retriable_error(&err_429),
"status 429 should be retriable"
);
}

#[test]
fn test_is_not_retriable_request_failed_statuses() {
for status in [400u16, 401, 403, 404, 422] {
let err = GenevaConfigClientError::RequestFailed {
status,
message: String::new(),
};
assert!(
!super::is_retriable_error(&err),
"status {status} should NOT be retriable"
);
}
}
}