Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ url = "2.2"
md5 = "0.8.0"
hex = "0.4"
lz4_flex = { version = "0.11", features = ["safe-encode"], default-features = false }
tokio = { version = "1", default-features = false, features = ["time"] } # Needed for tokio::time::sleep in library code

[features]
self_signed_certs = [] # Empty by default for security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub(crate) enum GenevaConfigClientError {
Http(#[from] reqwest::Error),
#[error("Request failed with status {status}: {message}")]
RequestFailed { status: u16, message: String },
#[error("Request failed with status {status}: {message} (Retry-After={retry_after_secs:?})")]
RequestFailedWithRetryAfter {
status: u16,
message: String,
retry_after_secs: Option<u64>,
},

// Data / parsing
#[error("JSON error: {0}")]
Expand All @@ -93,6 +99,18 @@ pub(crate) enum GenevaConfigClientError {
#[allow(dead_code)]
pub(crate) type Result<T> = std::result::Result<T, GenevaConfigClientError>;

const MAX_GCS_RETRIES: usize = 3;
const FIXED_BACKOFF: [Duration; 3] = [
Duration::from_millis(200),
Duration::from_millis(400),
Duration::from_millis(800),
];

/// Minimum acceptable retry-after value in seconds for rate limiting responses
const MIN_RETRY_AFTER_SECS: u64 = 1;
/// Maximum acceptable retry-after value in seconds for rate limiting responses
const MAX_RETRY_AFTER_SECS: u64 = 3;

/// Configuration for the Geneva Config Client.
///
/// # Fields
Expand Down Expand Up @@ -370,10 +388,36 @@ impl GenevaConfigClient {
}
}
}
// Cache miss or expired token, fetch fresh data
// Perform actual fetch before acquiring write lock to minimize lock contention
let (fresh_ingestion_gateway_info, fresh_moniker_info) =
self.fetch_ingestion_info().await?;
// Cache miss or expired token, fetch fresh data with retry
let (fresh_ingestion_gateway_info, fresh_moniker_info) = {
let mut last_error = None;
let mut result = None;
for attempt in 0..=MAX_GCS_RETRIES {
match self.fetch_ingestion_info().await {
Ok(res) => {
result = Some(res);
break;
}
Err(e) => {
if attempt < MAX_GCS_RETRIES && is_retriable_error(&e) {
let delay = match &e {
GenevaConfigClientError::RequestFailedWithRetryAfter {
retry_after_secs: Some(s),
..
} => Duration::from_secs(*s),
_ => {
let idx = std::cmp::min(attempt, FIXED_BACKOFF.len() - 1);
FIXED_BACKOFF[idx]
}
};
tokio::time::sleep(delay).await;
}
last_error = Some(e);
}
}
}
result.ok_or_else(|| last_error.unwrap())?
};

let token_expiry =
Self::parse_token_expiry(&fresh_ingestion_gateway_info.auth_token_expiry_time)
Expand Down Expand Up @@ -438,6 +482,18 @@ impl GenevaConfigClient {
.map_err(GenevaConfigClientError::Http)?;
// Check if the response is successful
let status = response.status();
// Capture Retry-After before consuming body
let retry_after_header = if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
response
.headers()
.get("Retry-After")
.and_then(|hv| hv.to_str().ok())
.map(|s| s.trim().to_string())
} else {
None
};
let body = response.text().await?;
if status.is_success() {
let parsed = match serde_json::from_str::<GenevaResponse>(&body) {
Expand All @@ -463,6 +519,17 @@ impl GenevaConfigClient {
Err(GenevaConfigClientError::MonikerNotFound(
"No primary diag moniker found in storage accounts".to_string(),
))
} else if status == reqwest::StatusCode::TOO_MANY_REQUESTS
|| status == reqwest::StatusCode::SERVICE_UNAVAILABLE
{
let retry_after_secs = retry_after_header
.and_then(|s| s.parse::<u64>().ok())
.filter(|v| *v >= MIN_RETRY_AFTER_SECS && *v <= MAX_RETRY_AFTER_SECS);
Err(GenevaConfigClientError::RequestFailedWithRetryAfter {
status: status.as_u16(),
message: body,
retry_after_secs,
})
} else {
Err(GenevaConfigClientError::RequestFailed {
status: status.as_u16(),
Expand Down Expand Up @@ -533,6 +600,17 @@ fn extract_endpoint_from_token(token: &str) -> Result<String> {
Ok(endpoint)
}

fn is_retriable_error(err: &GenevaConfigClientError) -> bool {
match err {
GenevaConfigClientError::Http(e) => e.is_timeout() || e.is_connect(),
GenevaConfigClientError::RequestFailed { status, .. }
| GenevaConfigClientError::RequestFailedWithRetryAfter { status, .. } => {
(*status >= 500 && *status < 600) || *status == 429
}
_ => false,
}
}

#[cfg(feature = "self_signed_certs")]
fn configure_tls_connector(
mut builder: native_tls::TlsConnectorBuilder,
Expand All @@ -558,3 +636,46 @@ fn configure_tls_connector(
.max_protocol_version(Some(Protocol::Tlsv12));
builder
}

#[cfg(test)]
mod retry_tests {
use super::*;

#[test]
fn test_is_retriable_request_failed_statuses() {
// 500 range
for status in [500u16, 502, 503, 504, 599] {
let err = GenevaConfigClientError::RequestFailed {
status,
message: String::new(),
};
assert!(
super::is_retriable_error(&err),
"status {status} should be retriable"
);
}
// 429
let err_429 = GenevaConfigClientError::RequestFailed {
status: 429,
message: String::new(),
};
assert!(
super::is_retriable_error(&err_429),
"status 429 should be retriable"
);
}

#[test]
fn test_is_not_retriable_request_failed_statuses() {
for status in [400u16, 401, 403, 404, 422] {
let err = GenevaConfigClientError::RequestFailed {
status,
message: String::new(),
};
assert!(
!super::is_retriable_error(&err),
"status {status} should NOT be retriable"
);
}
}
}
Loading