Skip to content

Commit f898ddf

Browse files
committed
node: Create / upsert files with retries
1 parent 0571b87 commit f898ddf

File tree

4 files changed

+101
-64
lines changed

4 files changed

+101
-64
lines changed

node/src/api/client.rs

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,22 @@ use serde::Serialize;
1717
use tokio::time;
1818
use tracing::debug;
1919

20+
use self::ApiVersion::*;
21+
use self::BaseUrl::*;
2022
use crate::api::*;
2123

2224
const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
23-
const MAX_ATTEMPTS: usize = 3;
2425
/// How long to wait after the second failed API request before trying again.
2526
/// This is relatively long since if the second try failed, it probably means
2627
/// that the backend is down, which could be the case for a while.
2728
const RETRY_INTERVAL: Duration = Duration::from_secs(15);
2829

30+
// Avoid `Method::` prefix. Associated constants can't be imported
31+
const GET: Method = Method::GET;
32+
const PUT: Method = Method::PUT;
33+
const POST: Method = Method::POST;
34+
const DELETE: Method = Method::DELETE;
35+
2936
/// Enumerates the base urls that can be used in an API call.
3037
#[derive(Copy, Clone)]
3138
enum BaseUrl {
@@ -38,9 +45,6 @@ enum ApiVersion {
3845
V1,
3946
}
4047

41-
use ApiVersion::*;
42-
use BaseUrl::*;
43-
4448
impl Display for ApiVersion {
4549
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4650
match &self {
@@ -76,8 +80,7 @@ impl ApiClient for LexeApiClient {
7680
user_pk: UserPk,
7781
) -> Result<Option<Node>, ApiError> {
7882
let data = GetByUserPk { user_pk };
79-
self.request(&Method::GET, Backend, V1, "/node", &data)
80-
.await
83+
self.request(&GET, Backend, V1, "/node", &data).await
8184
}
8285

8386
async fn get_instance(
@@ -90,8 +93,7 @@ impl ApiClient for LexeApiClient {
9093
measurement,
9194
};
9295
let maybe_instance: Option<Instance> = self
93-
.request(&Method::GET, Backend, V1, "/instance", &data)
94-
.await?;
96+
.request(&GET, Backend, V1, "/instance", &data).await?;
9597

9698
if let Some(instance) = maybe_instance.as_ref() {
9799
if instance.measurement != measurement {
@@ -111,34 +113,49 @@ impl ApiClient for LexeApiClient {
111113
&self,
112114
data: SealedSeedId,
113115
) -> Result<Option<SealedSeed>, ApiError> {
114-
self.request(&Method::GET, Backend, V1, "/sealed_seed", &data)
115-
.await
116+
self.request(&GET, Backend, V1, "/sealed_seed", &data).await
116117
}
117118

118119
async fn create_node_instance_seed(
119120
&self,
120121
data: NodeInstanceSeed,
121122
) -> Result<NodeInstanceSeed, ApiError> {
122123
let endpoint = "/acid/node_instance_seed";
123-
self.request(&Method::POST, Backend, V1, endpoint, &data)
124-
.await
124+
self.request(&POST, Backend, V1, endpoint, &data).await
125125
}
126126

127127
async fn get_file(&self, data: &FileId) -> Result<Option<File>, ApiError> {
128128
let endpoint = "/file";
129-
self.request(&Method::GET, Backend, V1, endpoint, &data)
130-
.await
129+
self.request(&GET, Backend, V1, endpoint, &data).await
131130
}
132131

133132
async fn create_file(&self, data: &File) -> Result<File, ApiError> {
134133
let endpoint = "/file";
135-
self.request(&Method::POST, Backend, V1, endpoint, &data)
134+
self.request(&POST, Backend, V1, endpoint, &data).await
135+
}
136+
137+
async fn create_file_with_retries(
138+
&self,
139+
data: &File,
140+
retries: usize,
141+
) -> Result<File, ApiError> {
142+
let endpoint = "/file";
143+
self.request_with_retries(&POST, Backend, V1, endpoint, &data, retries)
136144
.await
137145
}
138146

139147
async fn upsert_file(&self, data: &File) -> Result<File, ApiError> {
140148
let endpoint = "/file";
141-
self.request(&Method::PUT, Backend, V1, endpoint, &data)
149+
self.request(&PUT, Backend, V1, endpoint, &data).await
150+
}
151+
152+
async fn upsert_file_with_retries(
153+
&self,
154+
data: &File,
155+
retries: usize,
156+
) -> Result<File, ApiError> {
157+
let endpoint = "/file";
158+
self.request_with_retries(&PUT, Backend, V1, endpoint, &data, retries)
142159
.await
143160
}
144161

@@ -147,46 +164,47 @@ impl ApiClient for LexeApiClient {
147164
#[allow(dead_code)]
148165
async fn delete_file(&self, data: &FileId) -> Result<String, ApiError> {
149166
let endpoint = "/file";
150-
self.request(&Method::DELETE, Backend, V1, endpoint, &data)
151-
.await
167+
self.request(&DELETE, Backend, V1, endpoint, &data).await
152168
}
153169

154170
async fn get_directory(
155171
&self,
156172
data: &Directory,
157173
) -> Result<Vec<File>, ApiError> {
158174
let endpoint = "/directory";
159-
self.request(&Method::GET, Backend, V1, endpoint, &data)
160-
.await
175+
self.request(&GET, Backend, V1, endpoint, &data).await
161176
}
162177

163178
async fn notify_runner(
164179
&self,
165180
data: UserPorts,
166181
) -> Result<UserPorts, ApiError> {
167-
self.request(&Method::POST, Runner, V1, "/ready", &data)
168-
.await
182+
self.request(&POST, Runner, V1, "/ready", &data).await
169183
}
170184
}
171185

172186
impl LexeApiClient {
173-
/// Tries to complete an API request, making up to `MAX_ATTEMPTS` attempts.
174-
async fn request<D: Serialize, T: DeserializeOwned>(
187+
/// Tries to complete an API request, retrying up to `retries` times.
188+
async fn request_with_retries<D: Serialize, T: DeserializeOwned>(
175189
&self,
176190
method: &Method,
177191
base: BaseUrl,
178192
ver: ApiVersion,
179193
endpoint: &str,
180194
data: &D,
195+
retries: usize,
181196
) -> Result<T, ApiError> {
182197
let mut retry_timer = time::interval(RETRY_INTERVAL);
183198

184-
// Try the first n-1 times, return early if successful
185-
for _ in 0..MAX_ATTEMPTS - 1 {
186-
let res = self.execute(method, base, ver, endpoint, data).await;
199+
// 'Do the retries first' and return early if successful.
200+
// If retries == 0 this block is a noop.
201+
for _ in 0..retries {
202+
let res = self.request(method, base, ver, endpoint, data).await;
187203
if res.is_ok() {
188204
return res;
189205
} else {
206+
// TODO log errors here
207+
190208
// Since the first tick resolves immediately, and we tick only
191209
// on failures, the first failed attempt is immediately followed
192210
// up with second attempt (to encode that sometimes messages are
@@ -197,12 +215,12 @@ impl LexeApiClient {
197215
}
198216
}
199217

200-
// Last try
201-
self.execute(method, base, ver, endpoint, &data).await
218+
// Do the 'main attempt'.
219+
self.request(method, base, ver, endpoint, &data).await
202220
}
203221

204222
/// Executes an API request once.
205-
async fn execute<D: Serialize, T: DeserializeOwned>(
223+
async fn request<D: Serialize, T: DeserializeOwned>(
206224
&self,
207225
method: &Method,
208226
base: BaseUrl,
@@ -220,7 +238,7 @@ impl LexeApiClient {
220238
// If GET, serialize the data in a query string
221239
let method = method.to_owned();
222240
let query_str = match method {
223-
Method::GET => Some(serde_qs::to_string(data)?),
241+
GET => Some(serde_qs::to_string(data)?),
224242
_ => None,
225243
};
226244
// Append directly to url since RequestBuilder.param() API is unwieldy
@@ -234,7 +252,7 @@ impl LexeApiClient {
234252

235253
// If PUT or POST, serialize the data in the request body
236254
let body = match method {
237-
Method::PUT | Method::POST => serde_json::to_string(data)?,
255+
PUT | POST => serde_json::to_string(data)?,
238256
_ => String::new(),
239257
};
240258
// println!(" Body: {}", body);

node/src/api/mock.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,27 @@ impl ApiClient for MockApiClient {
165165
Ok(file.clone())
166166
}
167167

168+
async fn create_file_with_retries(
169+
&self,
170+
file: &File,
171+
_retries: usize,
172+
) -> Result<File, ApiError> {
173+
self.create_file(file).await
174+
}
175+
168176
async fn upsert_file(&self, file: &File) -> Result<File, ApiError> {
169177
self.vfs.lock().unwrap().insert(file.clone());
170178
Ok(file.clone())
171179
}
172180

181+
async fn upsert_file_with_retries(
182+
&self,
183+
file: &File,
184+
_retries: usize,
185+
) -> Result<File, ApiError> {
186+
self.upsert_file(file).await
187+
}
188+
173189
/// Returns "OK" if exactly one row was deleted.
174190
async fn delete_file(&self, file_id: &FileId) -> Result<String, ApiError> {
175191
let file_opt = self.vfs.lock().unwrap().remove(file_id.clone());

node/src/api/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,20 @@ pub trait ApiClient {
6161

6262
async fn create_file(&self, file: &File) -> Result<File, ApiError>;
6363

64+
async fn create_file_with_retries(
65+
&self,
66+
file: &File,
67+
retries: usize,
68+
) -> Result<File, ApiError>;
69+
6470
async fn upsert_file(&self, file: &File) -> Result<File, ApiError>;
6571

72+
async fn upsert_file_with_retries(
73+
&self,
74+
file: &File,
75+
retries: usize,
76+
) -> Result<File, ApiError>;
77+
6678
// TODO We want to delete channel peers / monitors when channels close
6779
/// Returns "OK" if exactly one row was deleted.
6880
async fn delete_file(&self, file_id: &FileId) -> Result<String, ApiError>;

node/src/lexe/persister.rs

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ const SCORER_FILENAME: &str = "scorer";
4242
pub const CHANNEL_PEERS_DIRECTORY: &str = "channel_peers";
4343
pub const CHANNEL_MONITORS_DIRECTORY: &str = "channel_monitors";
4444

45+
/// The default number of retries for important persisted state
46+
const DEFAULT_RETRIES: usize = 3;
47+
4548
/// An Arc is held internally, so it is fine to clone and use directly.
4649
#[derive(Clone)] // TODO Try removing this
4750
pub struct LexePersister {
@@ -318,8 +321,9 @@ impl InnerPersister {
318321
Vec::new(),
319322
);
320323

324+
// Retry up to 3 times
321325
self.api
322-
.create_file(&cp_file)
326+
.create_file_with_retries(&cp_file, DEFAULT_RETRIES)
323327
.await
324328
.map(|_| ())
325329
.map_err(|e| e.into())
@@ -342,8 +346,9 @@ impl InnerPersister {
342346
data,
343347
);
344348

349+
// Channel manager is more important so let's retry up to three times
345350
self.api
346-
.upsert_file(&cm_file)
351+
.upsert_file_with_retries(&cm_file, DEFAULT_RETRIES)
347352
.await
348353
.map(|_| ())
349354
.context("Could not persist channel manager")
@@ -432,21 +437,14 @@ impl Persist<SignerType> for InnerPersister {
432437
self.channel_monitor_updated_tx.clone();
433438
tokio::spawn(async move {
434439
// Retry indefinitely until it succeeds
435-
loop {
436-
// TODO Also attempt to persist to cloud backup
437-
match api_clone.create_file(&cm_file).await {
438-
Ok(_file) => {
439-
if let Err(e) =
440-
channel_monitor_updated_tx.try_send(update)
441-
{
442-
error!("Couldn't notify chain monitor: {:#}", e);
443-
}
444-
return;
445-
}
446-
Err(e) => {
447-
error!("Couldn't persist new channel monitor: {:#}", e)
448-
}
449-
}
440+
// TODO Also attempt to persist to cloud backup
441+
api_clone
442+
.create_file_with_retries(&cm_file, usize::MAX)
443+
.await
444+
.expect("Unlimited retries always return Ok");
445+
446+
if let Err(e) = channel_monitor_updated_tx.try_send(update) {
447+
error!("Couldn't notify chain monitor: {:#}", e);
450448
}
451449
});
452450

@@ -487,21 +485,14 @@ impl Persist<SignerType> for InnerPersister {
487485
self.channel_monitor_updated_tx.clone();
488486
tokio::spawn(async move {
489487
// Retry indefinitely until it succeeds
490-
loop {
491-
// TODO Also attempt to persist to cloud backup
492-
match api_clone.upsert_file(&cm_file).await {
493-
Ok(_) => {
494-
if let Err(e) =
495-
channel_monitor_updated_tx.try_send(update)
496-
{
497-
error!("Couldn't notify chain monitor: {:#}", e);
498-
}
499-
return;
500-
}
501-
Err(e) => {
502-
error!("Could not update channel monitor: {:#}", e)
503-
}
504-
}
488+
// TODO Also attempt to persist to cloud backup
489+
api_clone
490+
.upsert_file_with_retries(&cm_file, usize::MAX)
491+
.await
492+
.expect("Unlimited retries always return Ok");
493+
494+
if let Err(e) = channel_monitor_updated_tx.try_send(update) {
495+
error!("Couldn't notify chain monitor: {:#}", e);
505496
}
506497
});
507498

0 commit comments

Comments
 (0)