Skip to content

Commit 4719a9c

Browse files
committed
Add retry mechanism in VssClient
1 parent 895c94f commit 4719a9c

File tree

2 files changed

+119
-73
lines changed

2 files changed

+119
-73
lines changed

src/client.rs

Lines changed: 94 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,30 @@ use crate::types::{
77
DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse, ListKeyVersionsRequest,
88
ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
99
};
10+
use crate::util::retry::{retry, RetryPolicy};
1011

1112
/// Thin-client to access a hosted instance of Versioned Storage Service (VSS).
1213
/// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API.
1314
#[derive(Clone)]
14-
pub struct VssClient {
15+
pub struct VssClient<R>
16+
where
17+
R: RetryPolicy<E = VssError>,
18+
{
1519
base_url: String,
1620
client: Client,
21+
retry_policy: R,
1722
}
1823

19-
impl VssClient {
24+
impl<R: RetryPolicy<E = VssError>> VssClient<R> {
2025
/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
21-
pub fn new(base_url: &str) -> Self {
26+
pub fn new(base_url: &str, retry_policy: R) -> Self {
2227
let client = Client::new();
23-
Self::from_client(base_url, client)
28+
Self::from_client(base_url, client, retry_policy)
2429
}
2530

2631
/// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint.
27-
pub fn from_client(base_url: &str, client: Client) -> Self {
28-
Self { base_url: String::from(base_url), client }
32+
pub fn from_client(base_url: &str, client: Client, retry_policy: R) -> Self {
33+
Self { base_url: String::from(base_url), client, retry_policy }
2934
}
3035

3136
/// Returns the underlying base URL.
@@ -37,62 +42,83 @@ impl VssClient {
3742
/// Makes a service call to the `GetObject` endpoint of the VSS server.
3843
/// For API contract/usage, refer to docs for [`GetObjectRequest`] and [`GetObjectResponse`].
3944
pub async fn get_object(&self, request: &GetObjectRequest) -> Result<GetObjectResponse, VssError> {
40-
let url = format!("{}/getObject", self.base_url);
41-
42-
let raw_response = self.client.post(url).body(request.encode_to_vec()).send().await?;
43-
let status = raw_response.status();
44-
let payload = raw_response.bytes().await?;
45-
46-
if status.is_success() {
47-
let response = GetObjectResponse::decode(&payload[..])?;
48-
49-
if response.value.is_none() {
50-
return Err(VssError::InternalServerError(
51-
"VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
52-
));
53-
}
54-
55-
Ok(response)
56-
} else {
57-
Err(VssError::new(status, payload))
58-
}
45+
retry(
46+
|| async {
47+
let url = format!("{}/getObject", self.base_url);
48+
49+
let request_body = request.encode_to_vec();
50+
let raw_response = self.client.post(url).body(request_body).send().await?;
51+
let status = raw_response.status();
52+
let payload = raw_response.bytes().await?;
53+
54+
if status.is_success() {
55+
let response = GetObjectResponse::decode(&payload[..])?;
56+
57+
if response.value.is_none() {
58+
return Err(VssError::InternalServerError(
59+
"VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
60+
));
61+
}
62+
63+
Ok(response)
64+
} else {
65+
Err(VssError::new(status, payload))
66+
}
67+
},
68+
&self.retry_policy,
69+
)
70+
.await
5971
}
6072

6173
/// Writes multiple [`PutObjectRequest::transaction_items`] as part of a single transaction.
6274
/// Makes a service call to the `PutObject` endpoint of the VSS server, with multiple items.
6375
/// Items in the `request` are written in a single all-or-nothing transaction.
6476
/// For API contract/usage, refer to docs for [`PutObjectRequest`] and [`PutObjectResponse`].
6577
pub async fn put_object(&self, request: &PutObjectRequest) -> Result<PutObjectResponse, VssError> {
66-
let url = format!("{}/putObjects", self.base_url);
67-
68-
let response_raw = self.client.post(url).body(request.encode_to_vec()).send().await?;
69-
let status = response_raw.status();
70-
let payload = response_raw.bytes().await?;
71-
72-
if status.is_success() {
73-
let response = PutObjectResponse::decode(&payload[..])?;
74-
Ok(response)
75-
} else {
76-
Err(VssError::new(status, payload))
77-
}
78+
retry(
79+
|| async {
80+
let url = format!("{}/putObjects", self.base_url);
81+
82+
let request_body = request.encode_to_vec();
83+
let response_raw = self.client.post(&url).body(request_body).send().await?;
84+
let status = response_raw.status();
85+
let payload = response_raw.bytes().await?;
86+
87+
if status.is_success() {
88+
let response = PutObjectResponse::decode(&payload[..])?;
89+
Ok(response)
90+
} else {
91+
Err(VssError::new(status, payload))
92+
}
93+
},
94+
&self.retry_policy,
95+
)
96+
.await
7897
}
7998

8099
/// Deletes the given `key` and `value` in `request`.
81100
/// Makes a service call to the `DeleteObject` endpoint of the VSS server.
82101
/// For API contract/usage, refer to docs for [`DeleteObjectRequest`] and [`DeleteObjectResponse`].
83102
pub async fn delete_object(&self, request: &DeleteObjectRequest) -> Result<DeleteObjectResponse, VssError> {
84-
let url = format!("{}/deleteObject", self.base_url);
85-
86-
let response_raw = self.client.post(url).body(request.encode_to_vec()).send().await?;
87-
let status = response_raw.status();
88-
let payload = response_raw.bytes().await?;
89-
90-
if status.is_success() {
91-
let response = DeleteObjectResponse::decode(&payload[..])?;
92-
Ok(response)
93-
} else {
94-
Err(VssError::new(status, payload))
95-
}
103+
retry(
104+
|| async {
105+
let url = format!("{}/deleteObject", self.base_url);
106+
107+
let request_body = request.encode_to_vec();
108+
let response_raw = self.client.post(url).body(request_body).send().await?;
109+
let status = response_raw.status();
110+
let payload = response_raw.bytes().await?;
111+
112+
if status.is_success() {
113+
let response = DeleteObjectResponse::decode(&payload[..])?;
114+
Ok(response)
115+
} else {
116+
Err(VssError::new(status, payload))
117+
}
118+
},
119+
&self.retry_policy,
120+
)
121+
.await
96122
}
97123

98124
/// Lists keys and their corresponding version for a given [`ListKeyVersionsRequest::store_id`].
@@ -101,17 +127,24 @@ impl VssClient {
101127
pub async fn list_key_versions(
102128
&self, request: &ListKeyVersionsRequest,
103129
) -> Result<ListKeyVersionsResponse, VssError> {
104-
let url = format!("{}/listKeyVersions", self.base_url);
105-
106-
let response_raw = self.client.post(url).body(request.encode_to_vec()).send().await?;
107-
let status = response_raw.status();
108-
let payload = response_raw.bytes().await?;
109-
110-
if status.is_success() {
111-
let response = ListKeyVersionsResponse::decode(&payload[..])?;
112-
Ok(response)
113-
} else {
114-
Err(VssError::new(status, payload))
115-
}
130+
retry(
131+
|| async {
132+
let url = format!("{}/listKeyVersions", self.base_url);
133+
134+
let request_body = request.encode_to_vec();
135+
let response_raw = self.client.post(url).body(request_body).send().await?;
136+
let status = response_raw.status();
137+
let payload = response_raw.bytes().await?;
138+
139+
if status.is_success() {
140+
let response = ListKeyVersionsResponse::decode(&payload[..])?;
141+
Ok(response)
142+
} else {
143+
Err(VssError::new(status, payload))
144+
}
145+
},
146+
&self.retry_policy,
147+
)
148+
.await
116149
}
117150
}

tests/tests.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
mod tests {
33
use mockito::{self, Matcher};
44
use prost::Message;
5+
use std::time::Duration;
56
use vss_client::client::VssClient;
67
use vss_client::error::VssError;
78

89
use vss_client::types::{
910
DeleteObjectRequest, DeleteObjectResponse, ErrorCode, ErrorResponse, GetObjectRequest, GetObjectResponse,
1011
KeyValue, ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
1112
};
13+
use vss_client::util::retry::{ExponentialBackoffRetryPolicy, RetryPolicy};
1214

1315
const GET_OBJECT_ENDPOINT: &'static str = "/getObject";
1416
const PUT_OBJECT_ENDPOINT: &'static str = "/putObjects";
@@ -35,7 +37,7 @@ mod tests {
3537
.create();
3638

3739
// Create a new VssClient with the mock server URL.
38-
let client = VssClient::new(&base_url);
40+
let client = VssClient::new(&base_url, retry_policy());
3941

4042
let actual_result = client.get_object(&get_request).await.unwrap();
4143

@@ -68,7 +70,7 @@ mod tests {
6870
.create();
6971

7072
// Create a new VssClient with the mock server URL.
71-
let vss_client = VssClient::new(&base_url);
73+
let vss_client = VssClient::new(&base_url, retry_policy());
7274
let actual_result = vss_client.put_object(&request).await.unwrap();
7375

7476
let expected_result = &mock_response;
@@ -98,7 +100,7 @@ mod tests {
98100
.create();
99101

100102
// Create a new VssClient with the mock server URL.
101-
let vss_client = VssClient::new(&base_url);
103+
let vss_client = VssClient::new(&base_url, retry_policy());
102104
let actual_result = vss_client.delete_object(&request).await.unwrap();
103105

104106
let expected_result = &mock_response;
@@ -138,7 +140,7 @@ mod tests {
138140
.create();
139141

140142
// Create a new VssClient with the mock server URL.
141-
let client = VssClient::new(&base_url);
143+
let client = VssClient::new(&base_url, retry_policy());
142144

143145
let actual_result = client.list_key_versions(&request).await.unwrap();
144146

@@ -152,7 +154,7 @@ mod tests {
152154
#[tokio::test]
153155
async fn test_no_such_key_err_handling() {
154156
let base_url = mockito::server_url();
155-
let vss_client = VssClient::new(&base_url);
157+
let vss_client = VssClient::new(&base_url, retry_policy());
156158

157159
// NoSuchKeyError
158160
let error_response = ErrorResponse {
@@ -176,7 +178,7 @@ mod tests {
176178
#[tokio::test]
177179
async fn test_get_response_without_value() {
178180
let base_url = mockito::server_url();
179-
let vss_client = VssClient::new(&base_url);
181+
let vss_client = VssClient::new(&base_url, retry_policy());
180182

181183
// GetObjectResponse with None value
182184
let mock_response = GetObjectResponse { value: None, ..Default::default() };
@@ -191,13 +193,13 @@ mod tests {
191193
assert!(matches!(get_result.unwrap_err(), VssError::InternalServerError { .. }));
192194

193195
// Verify 1 request hit the server
194-
mock_server.expect(1).assert();
196+
mock_server.expect(3).assert();
195197
}
196198

197199
#[tokio::test]
198200
async fn test_invalid_request_err_handling() {
199201
let base_url = mockito::server_url();
200-
let vss_client = VssClient::new(&base_url);
202+
let vss_client = VssClient::new(&base_url, retry_policy());
201203

202204
// Invalid Request Error
203205
let error_response = ErrorResponse {
@@ -249,7 +251,7 @@ mod tests {
249251
#[tokio::test]
250252
async fn test_conflict_err_handling() {
251253
let base_url = mockito::server_url();
252-
let vss_client = VssClient::new(&base_url);
254+
let vss_client = VssClient::new(&base_url, retry_policy());
253255

254256
// Conflict Error
255257
let error_response =
@@ -276,7 +278,7 @@ mod tests {
276278
#[tokio::test]
277279
async fn test_internal_server_err_handling() {
278280
let base_url = mockito::server_url();
279-
let vss_client = VssClient::new(&base_url);
281+
let vss_client = VssClient::new(&base_url, retry_policy());
280282

281283
// Internal Server Error
282284
let error_response = ErrorResponse {
@@ -322,13 +324,13 @@ mod tests {
322324
assert!(matches!(list_result.unwrap_err(), VssError::InternalServerError { .. }));
323325

324326
// Verify 4 requests hit the server
325-
mock_server.expect(4).assert();
327+
mock_server.expect(12).assert();
326328
}
327329

328330
#[tokio::test]
329331
async fn test_internal_err_handling() {
330332
let base_url = mockito::server_url();
331-
let vss_client = VssClient::new(&base_url);
333+
let vss_client = VssClient::new(&base_url, retry_policy());
332334

333335
let error_response = ErrorResponse { error_code: 999, message: "UnknownException".to_string() };
334336
let mut _mock_server = mockito::mock("POST", Matcher::Any)
@@ -385,4 +387,15 @@ mod tests {
385387
let list_network_err = vss_client.list_key_versions(&list_request).await;
386388
assert!(matches!(list_network_err.unwrap_err(), VssError::InternalError { .. }));
387389
}
390+
391+
fn retry_policy() -> impl RetryPolicy<E = VssError> {
392+
ExponentialBackoffRetryPolicy::new(Duration::from_millis(1))
393+
.with_max_attempts(3)
394+
.skip_retry_on_error(|e| {
395+
matches!(
396+
e,
397+
VssError::NoSuchKeyError(..) | VssError::InvalidRequestError(..) | VssError::ConflictError(..)
398+
)
399+
})
400+
}
388401
}

0 commit comments

Comments
 (0)