Skip to content

Commit 1e9e1eb

Browse files
committed
node: Retry persistence requests
1 parent 9929dbb commit 1e9e1eb

File tree

3 files changed

+72
-40
lines changed

3 files changed

+72
-40
lines changed

node/src/api/client.rs

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ use tracing::debug;
1919
use crate::api::*;
2020

2121
const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
22+
const MAX_ATTEMPTS: usize = 3;
2223

2324
/// Enumerates the base urls that can be used in an API call.
25+
#[derive(Copy, Clone)]
2426
enum BaseUrl {
2527
Backend,
2628
Runner,
2729
}
2830

31+
#[derive(Copy, Clone)]
2932
enum ApiVersion {
3033
V1,
3134
}
@@ -67,21 +70,22 @@ impl ApiClient for LexeApiClient {
6770
&self,
6871
user_pk: UserPk,
6972
) -> Result<Option<Node>, ApiError> {
70-
let req = GetByUserPk { user_pk };
71-
self.request(Method::GET, Backend, V1, "/node", req).await
73+
let data = GetByUserPk { user_pk };
74+
self.request(&Method::GET, Backend, V1, "/node", &data)
75+
.await
7276
}
7377

7478
async fn get_instance(
7579
&self,
7680
user_pk: UserPk,
7781
measurement: Measurement,
7882
) -> Result<Option<Instance>, ApiError> {
79-
let req = GetByUserPkAndMeasurement {
83+
let data = GetByUserPkAndMeasurement {
8084
user_pk,
8185
measurement,
8286
};
8387
let maybe_instance: Option<Instance> = self
84-
.request(Method::GET, Backend, V1, "/instance", req)
88+
.request(&Method::GET, Backend, V1, "/instance", &data)
8589
.await?;
8690

8791
if let Some(instance) = maybe_instance.as_ref() {
@@ -100,80 +104,108 @@ impl ApiClient for LexeApiClient {
100104

101105
async fn get_sealed_seed(
102106
&self,
103-
req: SealedSeedId,
107+
data: SealedSeedId,
104108
) -> Result<Option<SealedSeed>, ApiError> {
105-
self.request(Method::GET, Backend, V1, "/sealed_seed", req)
109+
self.request(&Method::GET, Backend, V1, "/sealed_seed", &data)
106110
.await
107111
}
108112

109113
async fn create_node_instance_seed(
110114
&self,
111-
req: NodeInstanceSeed,
115+
data: NodeInstanceSeed,
112116
) -> Result<NodeInstanceSeed, ApiError> {
113117
let endpoint = "/acid/node_instance_seed";
114-
self.request(Method::POST, Backend, V1, endpoint, req).await
118+
self.request(&Method::POST, Backend, V1, endpoint, &data)
119+
.await
115120
}
116121

117-
async fn get_file(&self, req: FileId) -> Result<Option<File>, ApiError> {
122+
async fn get_file(&self, data: FileId) -> Result<Option<File>, ApiError> {
118123
let endpoint = "/file";
119-
self.request(Method::GET, Backend, V1, endpoint, req).await
124+
self.request(&Method::GET, Backend, V1, endpoint, &data)
125+
.await
120126
}
121127

122-
async fn create_file(&self, req: File) -> Result<File, ApiError> {
128+
async fn create_file(&self, data: File) -> Result<File, ApiError> {
123129
let endpoint = "/file";
124-
self.request(Method::POST, Backend, V1, endpoint, req).await
130+
self.request(&Method::POST, Backend, V1, endpoint, &data)
131+
.await
125132
}
126133

127-
async fn upsert_file(&self, req: File) -> Result<File, ApiError> {
134+
async fn upsert_file(&self, data: File) -> Result<File, ApiError> {
128135
let endpoint = "/file";
129-
self.request(Method::PUT, Backend, V1, endpoint, req).await
136+
self.request(&Method::PUT, Backend, V1, endpoint, &data)
137+
.await
130138
}
131139

132140
// TODO We want to delete channel peers / monitors when channels close
133141
/// Returns "OK" if exactly one row was deleted.
134142
#[allow(dead_code)]
135-
async fn delete_file(&self, req: FileId) -> Result<String, ApiError> {
143+
async fn delete_file(&self, data: FileId) -> Result<String, ApiError> {
136144
let endpoint = "/file";
137-
self.request(Method::DELETE, Backend, V1, endpoint, req)
145+
self.request(&Method::DELETE, Backend, V1, endpoint, &data)
138146
.await
139147
}
140148

141149
async fn get_directory(
142150
&self,
143-
req: Directory,
151+
data: Directory,
144152
) -> Result<Vec<File>, ApiError> {
145153
let endpoint = "/directory";
146-
self.request(Method::GET, Backend, V1, endpoint, req).await
154+
self.request(&Method::GET, Backend, V1, endpoint, &data)
155+
.await
147156
}
148157

149158
async fn notify_runner(
150159
&self,
151-
req: UserPorts,
160+
data: UserPorts,
152161
) -> Result<UserPorts, ApiError> {
153-
self.request(Method::POST, Runner, V1, "/ready", req).await
162+
self.request(&Method::POST, Runner, V1, "/ready", &data)
163+
.await
154164
}
155165
}
156166

157167
impl LexeApiClient {
158-
/// Builds and executes the API request
168+
/// Tries to complete an API request, making up to `MAX_ATTEMPTS` attempts.
159169
async fn request<D: Serialize, T: DeserializeOwned>(
160170
&self,
161-
method: Method,
162-
base_url: BaseUrl,
163-
api_version: ApiVersion,
171+
method: &Method,
172+
base: BaseUrl,
173+
ver: ApiVersion,
174+
endpoint: &str,
175+
data: &D,
176+
) -> Result<T, ApiError> {
177+
// Try the first n-1 times, return early if successful
178+
for _ in 0..MAX_ATTEMPTS - 1 {
179+
let res = self.execute(method, base, ver, endpoint, data).await;
180+
if res.is_ok() {
181+
return res;
182+
}
183+
}
184+
185+
// Last try
186+
self.execute(method, base, ver, endpoint, &data).await
187+
}
188+
189+
/// Executes an API request once.
190+
async fn execute<D: Serialize, T: DeserializeOwned>(
191+
&self,
192+
method: &Method,
193+
base: BaseUrl,
194+
ver: ApiVersion,
164195
endpoint: &str,
165-
data: D,
196+
data: &D,
166197
) -> Result<T, ApiError> {
167198
// Node backend api is versioned but runner api is not
168-
let (base, version) = match base_url {
169-
Backend => (&self.backend_url, api_version.to_string()),
199+
let (base, ver) = match base {
200+
Backend => (&self.backend_url, ver.to_string()),
170201
Runner => (&self.runner_url, String::new()),
171202
};
172-
let mut url = format!("{}{}{}", base, version, endpoint);
203+
let mut url = format!("{}{}{}", base, ver, endpoint);
173204

174205
// If GET, serialize the data in a query string
206+
let method = method.to_owned();
175207
let query_str = match method {
176-
Method::GET => Some(serde_qs::to_string(&data)?),
208+
Method::GET => Some(serde_qs::to_string(data)?),
177209
_ => None,
178210
};
179211
// Append directly to url since RequestBuilder.param() API is unwieldy
@@ -187,7 +219,7 @@ impl LexeApiClient {
187219

188220
// If PUT or POST, serialize the data in the request body
189221
let body = match method {
190-
Method::PUT | Method::POST => serde_json::to_string(&data)?,
222+
Method::PUT | Method::POST => serde_json::to_string(data)?,
191223
_ => String::new(),
192224
};
193225
// println!(" Body: {}", body);

node/src/api/mock.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,23 +132,23 @@ impl ApiClient for MockApiClient {
132132
/// Always return the dummy version
133133
async fn get_sealed_seed(
134134
&self,
135-
req: SealedSeedId,
135+
data: SealedSeedId,
136136
) -> Result<Option<SealedSeed>, ApiError> {
137137
let sealed_seed = SealedSeed::new(
138-
req.node_pk,
139-
req.measurement,
140-
req.machine_id,
141-
req.min_cpusvn,
142-
sealed_seed(&req.node_pk),
138+
data.node_pk,
139+
data.measurement,
140+
data.machine_id,
141+
data.min_cpusvn,
142+
sealed_seed(&data.node_pk),
143143
);
144144
Ok(Some(sealed_seed))
145145
}
146146

147147
async fn create_node_instance_seed(
148148
&self,
149-
req: NodeInstanceSeed,
149+
data: NodeInstanceSeed,
150150
) -> Result<NodeInstanceSeed, ApiError> {
151-
Ok(req)
151+
Ok(data)
152152
}
153153

154154
async fn get_file(

node/src/api/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ pub trait ApiClient {
4646

4747
async fn get_sealed_seed(
4848
&self,
49-
req: SealedSeedId,
49+
data: SealedSeedId,
5050
) -> Result<Option<SealedSeed>, ApiError>;
5151

5252
async fn create_node_instance_seed(
5353
&self,
54-
req: NodeInstanceSeed,
54+
data: NodeInstanceSeed,
5555
) -> Result<NodeInstanceSeed, ApiError>;
5656

5757
async fn get_file(&self, file_id: FileId)

0 commit comments

Comments
 (0)