From 09044c2cb0997c5d18d55aca5386e237e746efb2 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 19 Aug 2025 11:14:09 +0200 Subject: [PATCH 1/7] Enable `reqwest` client-level timeouts While the `RetryPolicy` has a `MaxTotalDelayRetryPolicy`, the retry `loop` would only check this configured delay once the operation future actually returns a value. However, without client-side timeouts, we're not super sure the operation is actually guaranteed to return anything (even an error, IIUC). So here, we enable some coarse client-side default timeouts to ensure the polled futures eventualy return either the response *or* an error we can handle via our retry logic. --- src/client.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index c6bf32e..5cf5388 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,6 +14,7 @@ use crate::types::{ use crate::util::retry::{retry, RetryPolicy}; const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; +const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); /// Thin-client to access a hosted instance of Versioned Storage Service (VSS). /// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API. @@ -31,7 +32,12 @@ where impl> VssClient { /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. pub fn new(base_url: String, retry_policy: R) -> Self { - let client = Client::new(); + let client = Client::builder() + .timeout(DEFAULT_TIMEOUT) + .connect_timeout(DEFAULT_TIMEOUT) + .read_timeout(DEFAULT_TIMEOUT) + .build() + .unwrap(); Self::from_client(base_url, client, retry_policy) } @@ -51,7 +57,12 @@ impl> VssClient { pub fn new_with_headers( base_url: String, retry_policy: R, header_provider: Arc, ) -> Self { - let client = Client::new(); + let client = Client::builder() + .timeout(DEFAULT_TIMEOUT) + .connect_timeout(DEFAULT_TIMEOUT) + .read_timeout(DEFAULT_TIMEOUT) + .build() + .unwrap(); Self { base_url, client, retry_policy, header_provider } } From c2c70595210cc5486be3f601afea8e472118c75b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 4 Nov 2025 13:10:14 +0100 Subject: [PATCH 2/7] DRY up `Client` building --- src/client.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5cf5388..3af0330 100644 --- a/src/client.rs +++ b/src/client.rs @@ -32,12 +32,7 @@ where impl> VssClient { /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. pub fn new(base_url: String, retry_policy: R) -> Self { - let client = Client::builder() - .timeout(DEFAULT_TIMEOUT) - .connect_timeout(DEFAULT_TIMEOUT) - .read_timeout(DEFAULT_TIMEOUT) - .build() - .unwrap(); + let client = build_client(); Self::from_client(base_url, client, retry_policy) } @@ -57,12 +52,7 @@ impl> VssClient { pub fn new_with_headers( base_url: String, retry_policy: R, header_provider: Arc, ) -> Self { - let client = Client::builder() - .timeout(DEFAULT_TIMEOUT) - .connect_timeout(DEFAULT_TIMEOUT) - .read_timeout(DEFAULT_TIMEOUT) - .build() - .unwrap(); + let client = build_client(); Self { base_url, client, retry_policy, header_provider } } @@ -173,3 +163,12 @@ impl> VssClient { } } } + +fn build_client() -> Client { + Client::builder() + .timeout(DEFAULT_TIMEOUT) + .connect_timeout(DEFAULT_TIMEOUT) + .read_timeout(DEFAULT_TIMEOUT) + .build() + .unwrap() +} From a25ed48ad904431e4d9f3021a9763d6bea6c424f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 4 Nov 2025 14:21:45 +0100 Subject: [PATCH 3/7] Re-export the `reqwest` crate .. as some types are part of our API. --- src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 2f808e6..f89bb79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,9 @@ #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] +// Crate re-exports +pub use reqwest; + /// Implements a thin-client ([`client::VssClient`]) to access a hosted instance of Versioned Storage Service (VSS). pub mod client; From c7ac1896ba6d6e2c6046c8fb42d7a69006053e8f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 4 Nov 2025 14:13:01 +0100 Subject: [PATCH 4/7] Utilize `reqwest::retry::Builder` to implement HTTP retries We here make use of `reqwest`'s `retry` functionlity that was introduced with its recent v0.12.23 release. As we can't fully replace the old 'application-level' behavior just yet, we configure it to only apply for error that arent INTERNAL_SERVER_ERROR, which is the status VSS uses to send error responses. We set some default values here, but note that these can always can be overridden by the user when using the `from_client` constructor. --- Cargo.toml | 4 ++-- src/client.rs | 39 ++++++++++++++++++++++++++++++--------- src/error.rs | 6 ++++++ tests/tests.rs | 33 +++++++++++++++++++-------------- 4 files changed, 57 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b8c1a8..97ae6b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ lnurl-auth = ["dep:bitcoin", "dep:url", "dep:serde", "dep:serde_json", "reqwest/ [dependencies] prost = "0.11.6" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } +reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls"] } tokio = { version = "1", default-features = false, features = ["time"] } rand = "0.8.5" async-trait = "0.1.77" @@ -34,7 +34,7 @@ chacha20-poly1305 = "0.1.2" [target.'cfg(genproto)'.build-dependencies] prost-build = { version = "0.11.3" } -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "blocking"] } +reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls", "blocking"] } [dev-dependencies] mockito = "0.28.0" diff --git a/src/client.rs b/src/client.rs index 3af0330..20dbb7a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,6 +15,7 @@ use crate::util::retry::{retry, RetryPolicy}; const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const DEFAULT_RETRIES: u32 = 2; /// Thin-client to access a hosted instance of Versioned Storage Service (VSS). /// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API. @@ -31,9 +32,9 @@ where impl> VssClient { /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. - pub fn new(base_url: String, retry_policy: R) -> Self { - let client = build_client(); - Self::from_client(base_url, client, retry_policy) + pub fn new(base_url: String, retry_policy: R) -> Result { + let client = build_client(&base_url)?; + Ok(Self::from_client(base_url, client, retry_policy)) } /// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint. @@ -51,9 +52,9 @@ impl> VssClient { /// HTTP headers will be provided by the given `header_provider`. pub fn new_with_headers( base_url: String, retry_policy: R, header_provider: Arc, - ) -> Self { - let client = build_client(); - Self { base_url, client, retry_policy, header_provider } + ) -> Result { + let client = build_client(&base_url)?; + Ok(Self { base_url, client, retry_policy, header_provider }) } /// Returns the underlying base URL. @@ -164,11 +165,31 @@ impl> VssClient { } } -fn build_client() -> Client { - Client::builder() +fn build_client(base_url: &str) -> Result { + let url = reqwest::Url::parse(base_url).map_err(|_| VssError::InvalidUrlError)?; + let host_str = url.host_str().ok_or(VssError::InvalidUrlError)?.to_string(); + // TODO: Once the response `payload` is available in `classify_fn`, we should filter out any + // error types for which retrying doesn't make sense (i.e., `NoSuchKeyError`, + // `InvalidRequestError`,`ConflictError`). + let retry = reqwest::retry::for_host(host_str) + .max_retries_per_request(DEFAULT_RETRIES) + .classify_fn(|req_rep| match req_rep.status() { + // VSS uses INTERNAL_SERVER_ERROR when sending back error repsonses. These are + // currently still covered by our `RetryPolicy`, so we tell `reqwest` not to retry them. + Some(reqwest::StatusCode::INTERNAL_SERVER_ERROR) => req_rep.success(), + Some(reqwest::StatusCode::BAD_REQUEST) => req_rep.success(), + Some(reqwest::StatusCode::UNAUTHORIZED) => req_rep.success(), + Some(reqwest::StatusCode::NOT_FOUND) => req_rep.success(), + Some(reqwest::StatusCode::CONFLICT) => req_rep.success(), + Some(reqwest::StatusCode::OK) => req_rep.success(), + _ => req_rep.retryable(), + }); + let client = Client::builder() .timeout(DEFAULT_TIMEOUT) .connect_timeout(DEFAULT_TIMEOUT) .read_timeout(DEFAULT_TIMEOUT) + .retry(retry) .build() - .unwrap() + .unwrap(); + Ok(client) } diff --git a/src/error.rs b/src/error.rs index 5955e6a..ea74240 100644 --- a/src/error.rs +++ b/src/error.rs @@ -28,6 +28,9 @@ pub enum VssError { /// There is an unknown error, it could be a client-side bug, unrecognized error-code, network error /// or something else. InternalError(String), + + /// The provided URL is invalid. + InvalidUrlError, } impl VssError { @@ -67,6 +70,9 @@ impl Display for VssError { VssError::InternalError(message) => { write!(f, "InternalError: {}", message) }, + VssError::InvalidUrlError => { + write!(f, "The provided URL is invalid") + }, } } } diff --git a/tests/tests.rs b/tests/tests.rs index 8fd6d04..35a3e6f 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -48,7 +48,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let client = VssClient::new(base_url, retry_policy()); + let client = VssClient::new(base_url, retry_policy()).unwrap(); let actual_result = client.get_object(&get_request).await.unwrap(); @@ -85,7 +85,8 @@ mod tests { "headerkey".to_string(), "headervalue".to_string(), )]))); - let client = VssClient::new_with_headers(base_url, retry_policy(), header_provider); + let client = + VssClient::new_with_headers(base_url, retry_policy(), header_provider).unwrap(); let actual_result = client.get_object(&get_request).await.unwrap(); @@ -123,7 +124,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); let actual_result = vss_client.put_object(&request).await.unwrap(); let expected_result = &mock_response; @@ -158,7 +159,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); let actual_result = vss_client.delete_object(&request).await.unwrap(); let expected_result = &mock_response; @@ -199,7 +200,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let client = VssClient::new(base_url, retry_policy()); + let client = VssClient::new(base_url, retry_policy()).unwrap(); let actual_result = client.list_key_versions(&request).await.unwrap(); @@ -213,7 +214,7 @@ mod tests { #[tokio::test] async fn test_no_such_key_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // NoSuchKeyError let error_response = ErrorResponse { @@ -240,7 +241,7 @@ mod tests { #[tokio::test] async fn test_get_response_without_value() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // GetObjectResponse with None value let mock_response = GetObjectResponse { value: None, ..Default::default() }; @@ -261,7 +262,7 @@ mod tests { #[tokio::test] async fn test_invalid_request_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // Invalid Request Error let error_response = ErrorResponse { @@ -321,7 +322,7 @@ mod tests { #[tokio::test] async fn test_auth_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // Invalid Request Error let error_response = ErrorResponse { @@ -393,8 +394,12 @@ mod tests { async fn test_header_provider_error() { let get_request = GetObjectRequest { store_id: "store".to_string(), key: "k1".to_string() }; let header_provider = Arc::new(FailingHeaderProvider {}); - let client = - VssClient::new_with_headers("notused".to_string(), retry_policy(), header_provider); + let client = VssClient::new_with_headers( + "http://localhost/vss".to_string(), + retry_policy(), + header_provider, + ) + .unwrap(); let result = client.get_object(&get_request).await; assert!(matches!(result, Err(VssError::AuthError { .. }))); @@ -403,7 +408,7 @@ mod tests { #[tokio::test] async fn test_conflict_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // Conflict Error let error_response = ErrorResponse { @@ -436,7 +441,7 @@ mod tests { #[tokio::test] async fn test_internal_server_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); // Internal Server Error let error_response = ErrorResponse { @@ -496,7 +501,7 @@ mod tests { #[tokio::test] async fn test_internal_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()); + let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); let error_response = ErrorResponse { error_code: 999, message: "UnknownException".to_string() }; From 93b7381bd3831569e8d4fb9b63b7a6c65715af8d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 5 Nov 2025 14:15:33 +0100 Subject: [PATCH 5/7] Expect `NOT_FOUND` / 404 status code in `NoSuchKey` error response --- tests/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests.rs b/tests/tests.rs index 35a3e6f..883a804 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -222,7 +222,7 @@ mod tests { message: "NoSuchKeyException".to_string(), }; let mock_server = mockito::mock("POST", GET_OBJECT_ENDPOINT) - .with_status(409) + .with_status(404) .with_body(&error_response.encode_to_vec()) .create(); From e6d8e57a949bff3cf511aef258b7aa2c681e35c9 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 4 Nov 2025 13:00:42 +0100 Subject: [PATCH 6/7] Drop custom retry policy We drop our (~broken) `RetryPolicy`, and replace it with the new-ish retry policy that is shipping in `reqwest` since 0.12.23. --- src/client.rs | 84 +++++---------- src/util/mod.rs | 3 - src/util/retry.rs | 245 ------------------------------------------- tests/retry_tests.rs | 99 ----------------- tests/tests.rs | 55 +++------- 5 files changed, 42 insertions(+), 444 deletions(-) delete mode 100644 src/util/retry.rs delete mode 100644 tests/retry_tests.rs diff --git a/src/client.rs b/src/client.rs index 20dbb7a..c55260f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,8 +11,6 @@ use crate::types::{ DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse, }; -use crate::util::retry::{retry, RetryPolicy}; - const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const DEFAULT_RETRIES: u32 = 2; @@ -20,41 +18,32 @@ const DEFAULT_RETRIES: u32 = 2; /// Thin-client to access a hosted instance of Versioned Storage Service (VSS). /// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API. #[derive(Clone)] -pub struct VssClient -where - R: RetryPolicy, -{ +pub struct VssClient { base_url: String, client: Client, - retry_policy: R, header_provider: Arc, } -impl> VssClient { +impl VssClient { /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. - pub fn new(base_url: String, retry_policy: R) -> Result { + pub fn new(base_url: String) -> Result { let client = build_client(&base_url)?; - Ok(Self::from_client(base_url, client, retry_policy)) + Ok(Self::from_client(base_url, client)) } /// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint. - pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self { - Self { - base_url, - client, - retry_policy, - header_provider: Arc::new(FixedHeaders::new(HashMap::new())), - } + pub fn from_client(base_url: String, client: Client) -> Self { + Self { base_url, client, header_provider: Arc::new(FixedHeaders::new(HashMap::new())) } } /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. /// /// HTTP headers will be provided by the given `header_provider`. pub fn new_with_headers( - base_url: String, retry_policy: R, header_provider: Arc, + base_url: String, header_provider: Arc, ) -> Result { let client = build_client(&base_url)?; - Ok(Self { base_url, client, retry_policy, header_provider }) + Ok(Self { base_url, client, header_provider }) } /// Returns the underlying base URL. @@ -68,22 +57,17 @@ impl> VssClient { pub async fn get_object( &self, request: &GetObjectRequest, ) -> Result { - retry( - || async { - let url = format!("{}/getObject", self.base_url); - self.post_request(request, &url).await.and_then(|response: GetObjectResponse| { - if response.value.is_none() { - Err(VssError::InternalServerError( - "VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(), - )) - } else { - Ok(response) - } - }) - }, - &self.retry_policy, - ) - .await + let url = format!("{}/getObject", self.base_url); + self.post_request(request, &url).await.and_then(|response: GetObjectResponse| { + if response.value.is_none() { + Err(VssError::InternalServerError( + "VSS Server API Violation, expected value in GetObjectResponse but found none" + .to_string(), + )) + } else { + Ok(response) + } + }) } /// Writes multiple [`PutObjectRequest::transaction_items`] as part of a single transaction. @@ -93,14 +77,8 @@ impl> VssClient { pub async fn put_object( &self, request: &PutObjectRequest, ) -> Result { - retry( - || async { - let url = format!("{}/putObjects", self.base_url); - self.post_request(request, &url).await - }, - &self.retry_policy, - ) - .await + let url = format!("{}/putObjects", self.base_url); + self.post_request(request, &url).await } /// Deletes the given `key` and `value` in `request`. @@ -109,14 +87,8 @@ impl> VssClient { pub async fn delete_object( &self, request: &DeleteObjectRequest, ) -> Result { - retry( - || async { - let url = format!("{}/deleteObject", self.base_url); - self.post_request(request, &url).await - }, - &self.retry_policy, - ) - .await + let url = format!("{}/deleteObject", self.base_url); + self.post_request(request, &url).await } /// Lists keys and their corresponding version for a given [`ListKeyVersionsRequest::store_id`]. @@ -125,14 +97,8 @@ impl> VssClient { pub async fn list_key_versions( &self, request: &ListKeyVersionsRequest, ) -> Result { - retry( - || async { - let url = format!("{}/listKeyVersions", self.base_url); - self.post_request(request, &url).await - }, - &self.retry_policy, - ) - .await + let url = format!("{}/listKeyVersions", self.base_url); + self.post_request(request, &url).await } async fn post_request( diff --git a/src/util/mod.rs b/src/util/mod.rs index 7f40f97..4ca1102 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -3,9 +3,6 @@ /// [`StorableBuilder`]: storable_builder::StorableBuilder pub mod storable_builder; -/// Contains retry utilities. -pub mod retry; - /// Contains [`KeyObfuscator`] utility. /// /// [`KeyObfuscator`]: key_obfuscator::KeyObfuscator diff --git a/src/util/retry.rs b/src/util/retry.rs deleted file mode 100644 index 9155c8a..0000000 --- a/src/util/retry.rs +++ /dev/null @@ -1,245 +0,0 @@ -use rand::Rng; -use std::error::Error; -use std::future::Future; -use std::marker::PhantomData; -use std::time::Duration; - -/// A function that performs and retries the given operation according to a retry policy. -/// -/// **Caution**: A retry policy without the number of attempts capped by [`MaxAttemptsRetryPolicy`] -/// decorator will result in infinite retries. -/// -/// **Example** -/// ```rust -/// # use std::time::Duration; -/// # use vss_client_ng::error::VssError; -/// # use vss_client_ng::util::retry::{ExponentialBackoffRetryPolicy, retry, RetryPolicy}; -/// # -/// # async fn operation() -> Result { -/// # tokio::time::sleep(Duration::from_millis(10)).await; -/// # Ok(42) -/// # } -/// # -/// let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100)) -/// .with_max_attempts(5) -/// .with_max_total_delay(Duration::from_secs(2)) -/// .with_max_jitter(Duration::from_millis(30)) -/// .skip_retry_on_error(|e| matches!(e, VssError::InvalidRequestError(..))); -/// -/// let result = retry(operation, &retry_policy); -///``` -/// -/// To use a retry policy as a member in a [`Send`] & [`Sync`] safe struct which needs to have known -/// size at compile time, we can specify its concrete type as follows: -/// ``` -/// # use std::time::Duration; -/// # use vss_client_ng::error::VssError; -/// # use vss_client_ng::util::retry::{ExponentialBackoffRetryPolicy, FilteredRetryPolicy, retry, RetryPolicy}; -/// -/// type VssRetryPolicy = FilteredRetryPolicy, Box bool>>; -/// -/// struct SomeStruct { -/// retry_policy: VssRetryPolicy, -/// } -/// -/// impl SomeStruct { -/// fn new() -> Self { -/// let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100)) -/// .skip_retry_on_error(Box::new(|e: &VssError| { matches!( e, VssError::NoSuchKeyError(..)) }) as _); -/// Self { retry_policy } -/// } -/// } -/// ``` -pub async fn retry(mut operation: F, retry_policy: &R) -> Result -where - R: RetryPolicy, - F: FnMut() -> Fut, - Fut: Future>, - E: Error, -{ - let mut attempts_made = 0; - let mut accumulated_delay = Duration::ZERO; - loop { - match operation().await { - Ok(result) => return Ok(result), - Err(err) => { - attempts_made += 1; - if let Some(delay) = retry_policy.next_delay(&RetryContext { - attempts_made, - accumulated_delay, - error: &err, - }) { - tokio::time::sleep(delay).await; - accumulated_delay += delay; - } else { - return Err(err); - } - }, - } - } -} - -/// Provides the logic for how and when to perform retries. -pub trait RetryPolicy: Sized { - /// The error type returned by the `operation` in `retry`. - type E: Error; - - /// Returns the duration to wait before trying the next attempt. - /// `context` represents the context of a retry operation. - /// - /// If `None` is returned then no further retry attempt is made. - fn next_delay(&self, context: &RetryContext) -> Option; - - /// Returns a new `RetryPolicy` that respects the given maximum attempts. - fn with_max_attempts(self, max_attempts: u32) -> MaxAttemptsRetryPolicy { - MaxAttemptsRetryPolicy { inner_policy: self, max_attempts } - } - - /// Returns a new `RetryPolicy` that respects the given total delay. - fn with_max_total_delay(self, max_total_delay: Duration) -> MaxTotalDelayRetryPolicy { - MaxTotalDelayRetryPolicy { inner_policy: self, max_total_delay } - } - - /// Returns a new `RetryPolicy` that adds jitter(random delay) to underlying policy. - fn with_max_jitter(self, max_jitter: Duration) -> JitteredRetryPolicy { - JitteredRetryPolicy { inner_policy: self, max_jitter } - } - - /// Skips retrying on errors that evaluate to `true` after applying `function`. - fn skip_retry_on_error(self, function: F) -> FilteredRetryPolicy - where - F: 'static + Fn(&Self::E) -> bool, - { - FilteredRetryPolicy { inner_policy: self, function } - } -} - -/// Represents the context of a retry operation. -/// -/// The context holds key information about the retry operation -/// such as how many attempts have been made until now, the accumulated -/// delay between retries, and the error that triggered the retry. -pub struct RetryContext<'a, E: Error> { - /// The number attempts made until now, before attempting the next retry. - attempts_made: u32, - - /// The amount of artificial delay we have already waited in between previous - /// attempts. Does not include the time taken to execute the operation. - accumulated_delay: Duration, - - /// The error encountered in the previous attempt. - error: &'a E, -} - -/// The exponential backoff strategy is a retry approach that doubles the delay between retries. -/// A combined exponential backoff and jitter strategy is recommended that is ["Exponential Backoff and Jitter"](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/). -/// This is helpful to avoid [Thundering Herd Problem](https://en.wikipedia.org/wiki/Thundering_herd_problem). -pub struct ExponentialBackoffRetryPolicy { - /// The base delay duration for the backoff algorithm. First retry is `base_delay` after first attempt. - base_delay: Duration, - phantom: PhantomData, -} - -impl ExponentialBackoffRetryPolicy { - /// Constructs a new instance using `base_delay`. - /// - /// `base_delay` is the base delay duration for the backoff algorithm. First retry is `base_delay` - /// after first attempt. - pub fn new(base_delay: Duration) -> ExponentialBackoffRetryPolicy { - Self { base_delay, phantom: PhantomData } - } -} - -impl RetryPolicy for ExponentialBackoffRetryPolicy { - type E = E; - fn next_delay(&self, context: &RetryContext) -> Option { - let backoff_factor = 2_u32.pow(context.attempts_made) - 1; - let delay = self.base_delay * backoff_factor; - Some(delay) - } -} - -/// Decorates the given `RetryPolicy` to respect the given maximum attempts. -pub struct MaxAttemptsRetryPolicy { - /// The underlying retry policy to use. - inner_policy: T, - /// The maximum number of attempts to retry. - max_attempts: u32, -} - -impl RetryPolicy for MaxAttemptsRetryPolicy { - type E = T::E; - fn next_delay(&self, context: &RetryContext) -> Option { - if self.max_attempts == context.attempts_made { - None - } else { - self.inner_policy.next_delay(context) - } - } -} - -/// Decorates the given `RetryPolicy` to respect the given maximum total delay. -pub struct MaxTotalDelayRetryPolicy { - /// The underlying retry policy to use. - inner_policy: T, - /// The maximum accumulated delay that will be allowed over all attempts. - max_total_delay: Duration, -} - -impl RetryPolicy for MaxTotalDelayRetryPolicy { - type E = T::E; - fn next_delay(&self, context: &RetryContext) -> Option { - let next_delay = self.inner_policy.next_delay(context); - if let Some(next_delay) = next_delay { - if self.max_total_delay < context.accumulated_delay + next_delay { - return None; - } - } - next_delay - } -} - -/// Decorates the given `RetryPolicy` and adds jitter (random delay) to it. This can make retries -/// more spread out and less likely to all fail at once. -pub struct JitteredRetryPolicy { - /// The underlying retry policy to use. - inner_policy: T, - /// The maximum amount of random jitter to apply to the delay. - max_jitter: Duration, -} - -impl RetryPolicy for JitteredRetryPolicy { - type E = T::E; - fn next_delay(&self, context: &RetryContext) -> Option { - if let Some(base_delay) = self.inner_policy.next_delay(context) { - let mut rng = rand::thread_rng(); - let jitter = - Duration::from_micros(rng.gen_range(0..self.max_jitter.as_micros() as u64)); - Some(base_delay + jitter) - } else { - None - } - } -} - -/// Decorates the given `RetryPolicy` by not retrying on errors that match the given function. -pub struct FilteredRetryPolicy { - inner_policy: T, - function: F, -} - -impl RetryPolicy for FilteredRetryPolicy -where - T: RetryPolicy, - F: Fn(&E) -> bool, - E: Error, -{ - type E = T::E; - fn next_delay(&self, context: &RetryContext) -> Option { - if (self.function)(&context.error) { - None - } else { - self.inner_policy.next_delay(context) - } - } -} diff --git a/tests/retry_tests.rs b/tests/retry_tests.rs deleted file mode 100644 index ff66e6a..0000000 --- a/tests/retry_tests.rs +++ /dev/null @@ -1,99 +0,0 @@ -#[cfg(test)] -mod retry_tests { - use std::io; - use std::sync::atomic::{AtomicU32, Ordering}; - use std::sync::Arc; - use std::time::Duration; - - use vss_client_ng::error::VssError; - use vss_client_ng::util::retry::{retry, ExponentialBackoffRetryPolicy, RetryPolicy}; - - #[tokio::test] - async fn test_async_retry() { - let base_delay = Duration::from_millis(10); - let max_attempts = 3; - let max_total_delay = Duration::from_secs(60); - let max_jitter = Duration::from_millis(5); - - let exponential_backoff_jitter_policy = ExponentialBackoffRetryPolicy::new(base_delay) - .skip_retry_on_error(|e| matches!(e, VssError::InvalidRequestError(..))) - .with_max_attempts(max_attempts) - .with_max_total_delay(max_total_delay) - .with_max_jitter(max_jitter); - - let mut call_count = Arc::new(AtomicU32::new(0)); - let count = call_count.clone(); - let async_function = move || { - let count = count.clone(); - async move { - let attempts_made = count.fetch_add(1, Ordering::SeqCst); - if attempts_made < max_attempts - 1 { - return Err(VssError::InternalServerError("Failure".to_string())); - } - tokio::time::sleep(Duration::from_millis(100)).await; - Ok(42) - } - }; - - let result = retry(async_function, &exponential_backoff_jitter_policy).await; - assert_eq!(result.ok(), Some(42)); - assert_eq!(call_count.load(Ordering::SeqCst), max_attempts); - - call_count = Arc::new(AtomicU32::new(0)); - let count = call_count.clone(); - let failing_async_function = move || { - let count = count.clone(); - async move { - count.fetch_add(1, Ordering::SeqCst); - tokio::time::sleep(Duration::from_millis(100)).await; - Err::<(), VssError>(VssError::InternalServerError("Failed".to_string())) - } - }; - - let failed_result = retry(failing_async_function, &exponential_backoff_jitter_policy).await; - assert!(failed_result.is_err()); - assert_eq!(call_count.load(Ordering::SeqCst), 3); - } - - #[tokio::test] - async fn test_retry_on_all_errors() { - let retry_policy = - ExponentialBackoffRetryPolicy::new(Duration::from_millis(10)).with_max_attempts(3); - - let call_count = Arc::new(AtomicU32::new(0)); - let count = call_count.clone(); - let failing_async_function = move || { - let count = count.clone(); - async move { - count.fetch_add(1, Ordering::SeqCst); - tokio::time::sleep(Duration::from_millis(100)).await; - Err::<(), io::Error>(io::Error::new(io::ErrorKind::InvalidData, "Failure")) - } - }; - - let failed_result = retry(failing_async_function, &retry_policy).await; - assert!(failed_result.is_err()); - assert_eq!(call_count.load(Ordering::SeqCst), 3); - } - - #[tokio::test] - async fn test_retry_capped_by_max_total_delay() { - let retry_policy = ExponentialBackoffRetryPolicy::new(Duration::from_millis(100)) - .with_max_total_delay(Duration::from_millis(350)); - - let call_count = Arc::new(AtomicU32::new(0)); - let count = call_count.clone(); - let failing_async_function = move || { - let count = count.clone(); - async move { - count.fetch_add(1, Ordering::SeqCst); - tokio::time::sleep(Duration::from_millis(100)).await; - Err::<(), VssError>(VssError::InternalServerError("Failed".to_string())) - } - }; - - let failed_result = retry(failing_async_function, &retry_policy).await; - assert!(failed_result.is_err()); - assert_eq!(call_count.load(Ordering::SeqCst), 2); - } -} diff --git a/tests/tests.rs b/tests/tests.rs index 883a804..0c6c623 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -6,7 +6,6 @@ mod tests { use reqwest::header::CONTENT_TYPE; use std::collections::HashMap; use std::sync::Arc; - use std::time::Duration; use vss_client_ng::client::VssClient; use vss_client_ng::error::VssError; use vss_client_ng::headers::FixedHeaders; @@ -18,8 +17,6 @@ mod tests { GetObjectResponse, KeyValue, ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse, }; - use vss_client_ng::util::retry::{ExponentialBackoffRetryPolicy, RetryPolicy}; - const APPLICATION_OCTET_STREAM: &'static str = "application/octet-stream"; const GET_OBJECT_ENDPOINT: &'static str = "/getObject"; @@ -48,7 +45,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let client = VssClient::new(base_url, retry_policy()).unwrap(); + let client = VssClient::new(base_url).unwrap(); let actual_result = client.get_object(&get_request).await.unwrap(); @@ -85,8 +82,7 @@ mod tests { "headerkey".to_string(), "headervalue".to_string(), )]))); - let client = - VssClient::new_with_headers(base_url, retry_policy(), header_provider).unwrap(); + let client = VssClient::new_with_headers(base_url, header_provider).unwrap(); let actual_result = client.get_object(&get_request).await.unwrap(); @@ -124,7 +120,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); let actual_result = vss_client.put_object(&request).await.unwrap(); let expected_result = &mock_response; @@ -159,7 +155,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); let actual_result = vss_client.delete_object(&request).await.unwrap(); let expected_result = &mock_response; @@ -200,7 +196,7 @@ mod tests { .create(); // Create a new VssClient with the mock server URL. - let client = VssClient::new(base_url, retry_policy()).unwrap(); + let client = VssClient::new(base_url).unwrap(); let actual_result = client.list_key_versions(&request).await.unwrap(); @@ -214,7 +210,7 @@ mod tests { #[tokio::test] async fn test_no_such_key_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // NoSuchKeyError let error_response = ErrorResponse { @@ -241,7 +237,7 @@ mod tests { #[tokio::test] async fn test_get_response_without_value() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // GetObjectResponse with None value let mock_response = GetObjectResponse { value: None, ..Default::default() }; @@ -256,13 +252,13 @@ mod tests { assert!(matches!(get_result.unwrap_err(), VssError::InternalServerError { .. })); // Verify 1 request hit the server - mock_server.expect(3).assert(); + mock_server.expect(1).assert(); } #[tokio::test] async fn test_invalid_request_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // Invalid Request Error let error_response = ErrorResponse { @@ -322,7 +318,7 @@ mod tests { #[tokio::test] async fn test_auth_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // Invalid Request Error let error_response = ErrorResponse { @@ -394,12 +390,9 @@ mod tests { async fn test_header_provider_error() { let get_request = GetObjectRequest { store_id: "store".to_string(), key: "k1".to_string() }; let header_provider = Arc::new(FailingHeaderProvider {}); - let client = VssClient::new_with_headers( - "http://localhost/vss".to_string(), - retry_policy(), - header_provider, - ) - .unwrap(); + let client = + VssClient::new_with_headers("http://localhost/vss".to_string(), header_provider) + .unwrap(); let result = client.get_object(&get_request).await; assert!(matches!(result, Err(VssError::AuthError { .. }))); @@ -408,7 +401,7 @@ mod tests { #[tokio::test] async fn test_conflict_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // Conflict Error let error_response = ErrorResponse { @@ -441,7 +434,7 @@ mod tests { #[tokio::test] async fn test_internal_server_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); // Internal Server Error let error_response = ErrorResponse { @@ -495,13 +488,13 @@ mod tests { assert!(matches!(list_result.unwrap_err(), VssError::InternalServerError { .. })); // Verify 4 requests hit the server - mock_server.expect(12).assert(); + mock_server.expect(4).assert(); } #[tokio::test] async fn test_internal_err_handling() { let base_url = mockito::server_url(); - let vss_client = VssClient::new(base_url, retry_policy()).unwrap(); + let vss_client = VssClient::new(base_url).unwrap(); let error_response = ErrorResponse { error_code: 999, message: "UnknownException".to_string() }; @@ -563,18 +556,4 @@ mod tests { let list_network_err = vss_client.list_key_versions(&list_request).await; assert!(matches!(list_network_err.unwrap_err(), VssError::InternalError { .. })); } - - fn retry_policy() -> impl RetryPolicy { - ExponentialBackoffRetryPolicy::new(Duration::from_millis(1)) - .with_max_attempts(3) - .skip_retry_on_error(|e| { - matches!( - e, - VssError::NoSuchKeyError(..) - | VssError::InvalidRequestError(..) - | VssError::ConflictError(..) - | VssError::AuthError(..) - ) - }) - } } From 7cf661b4ba45983ecad0f59e6d74050e2c84212f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 6 Nov 2025 14:06:13 +0100 Subject: [PATCH 7/7] Add `VssClient::from_client_and_headers` constructor Previously, we'd allow to either re-use a `reqwest::Client` or supply a header provider. Here we add a new constructor that allows us to do both at the same time. --- src/client.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/client.rs b/src/client.rs index c55260f..c7dac7d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -36,6 +36,15 @@ impl VssClient { Self { base_url, client, header_provider: Arc::new(FixedHeaders::new(HashMap::new())) } } + /// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint. + /// + /// HTTP headers will be provided by the given `header_provider`. + pub fn from_client_and_headers( + base_url: String, client: Client, header_provider: Arc, + ) -> Self { + Self { base_url, client, header_provider } + } + /// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint. /// /// HTTP headers will be provided by the given `header_provider`.