Skip to content

Commit 6b55fb1

Browse files
JOHNJOHN
authored andcommitted
feat: add timeout enforcement for storage operations
- Wrap storage put/get operations with tokio::time::timeout - Use operation_timeout_ms from RetryConfig (default 30s) - Return NoiseError::Timeout on exhausted retries - Add WASM limitation docs for missing timeout support
1 parent e843c91 commit 6b55fb1

File tree

1 file changed

+141
-24
lines changed

1 file changed

+141
-24
lines changed

src/storage_queue.rs

Lines changed: 141 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@
1313
//! - `/pub/my-app/messages/outbox`
1414
//! - `/pub/paykit.app/v0/noise/inbox`
1515
//!
16+
//! ## Timeout Enforcement
17+
//!
18+
//! All storage operations are wrapped with `tokio::time::timeout()` using the
19+
//! `operation_timeout_ms` value from [`RetryConfig`] (default: 30 seconds).
20+
//! This prevents indefinite blocking on slow or unresponsive networks.
21+
//!
22+
//! When a timeout occurs, the operation is retried (up to `max_retries`) with
23+
//! exponential backoff. If all retries are exhausted, a [`NoiseError::Timeout`]
24+
//! is returned.
25+
//!
1626
//! ## WASM Limitations
1727
//!
1828
//! On WASM targets, operation timeouts are not enforced because `tokio::time::timeout`
@@ -190,7 +200,14 @@ impl StorageBackedMessaging {
190200
self.read_counter
191201
}
192202

193-
/// Send a message with retry logic and exponential backoff
203+
/// Send a message with retry logic, exponential backoff, and timeout enforcement.
204+
///
205+
/// Each storage operation is subject to the `operation_timeout_ms` configured in `RetryConfig`.
206+
///
207+
/// # WASM Limitations
208+
///
209+
/// On WASM targets, timeouts are not enforced because `tokio::time::timeout` is not available.
210+
/// Operations may block indefinitely on slow networks.
194211
pub async fn send_message(&mut self, plaintext: &[u8]) -> Result<(), NoiseError> {
195212
let ciphertext = self.noise_link.encrypt(plaintext)?;
196213
let path = format!("{}/msg_{}", self.write_path, self.write_counter);
@@ -199,33 +216,65 @@ impl StorageBackedMessaging {
199216
let mut attempt = 0;
200217
let mut backoff_ms = self.retry_config.initial_backoff_ms;
201218

219+
#[cfg(not(target_arch = "wasm32"))]
220+
let timeout_duration = Duration::from_millis(self.retry_config.operation_timeout_ms);
221+
202222
loop {
203-
match self.session.storage().put(&path, ciphertext.clone()).await {
204-
Ok(_) => {
223+
// Apply timeout to the put operation (non-WASM only)
224+
#[cfg(not(target_arch = "wasm32"))]
225+
let result = tokio::time::timeout(
226+
timeout_duration,
227+
self.session.storage().put(&path, ciphertext.clone()),
228+
)
229+
.await;
230+
231+
#[cfg(target_arch = "wasm32")]
232+
let result = Ok(self.session.storage().put(&path, ciphertext.clone()).await);
233+
234+
match result {
235+
#[cfg(not(target_arch = "wasm32"))]
236+
Err(_elapsed) => {
237+
// Timeout occurred
238+
attempt += 1;
239+
if attempt >= self.retry_config.max_retries {
240+
return Err(NoiseError::Timeout(format!(
241+
"Put operation timed out after {}ms ({} attempts)",
242+
self.retry_config.operation_timeout_ms, attempt
243+
)));
244+
}
245+
// Continue to backoff and retry
246+
}
247+
Ok(Ok(_)) => {
205248
self.write_counter += 1;
206249
return Ok(());
207250
}
208-
Err(e) => {
251+
Ok(Err(e)) => {
209252
attempt += 1;
210253
if attempt >= self.retry_config.max_retries {
211254
return Err(NoiseError::Storage(format!(
212255
"Failed to put after {} attempts: {:?}",
213256
attempt, e
214257
)));
215258
}
216-
217-
// Exponential backoff with cap
218-
// Note: For WASM targets, consider using gloo-timers or similar
219-
#[cfg(not(target_arch = "wasm32"))]
220-
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
221-
222-
backoff_ms = (backoff_ms * 2).min(self.retry_config.max_backoff_ms);
223259
}
224260
}
261+
262+
// Exponential backoff with cap
263+
#[cfg(not(target_arch = "wasm32"))]
264+
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
265+
266+
backoff_ms = (backoff_ms * 2).min(self.retry_config.max_backoff_ms);
225267
}
226268
}
227269

228-
/// Receive messages with retry logic for transient errors
270+
/// Receive messages with retry logic, transient error handling, and timeout enforcement.
271+
///
272+
/// Each network operation is subject to the `operation_timeout_ms` configured in `RetryConfig`.
273+
///
274+
/// # WASM Limitations
275+
///
276+
/// On WASM targets, timeouts are not enforced because `tokio::time::timeout` is not available.
277+
/// Operations may block indefinitely on slow networks.
229278
pub async fn receive_messages(
230279
&mut self,
231280
max_messages: Option<usize>,
@@ -234,6 +283,9 @@ impl StorageBackedMessaging {
234283
let limit = max_messages.unwrap_or(10); // Default limit to avoid infinite loops
235284
let mut attempts = 0;
236285

286+
#[cfg(not(target_arch = "wasm32"))]
287+
let timeout_duration = Duration::from_millis(self.retry_config.operation_timeout_ms);
288+
237289
while attempts < limit {
238290
let path = format!("{}/msg_{}", self.read_path, self.read_counter);
239291

@@ -242,12 +294,56 @@ impl StorageBackedMessaging {
242294
let mut backoff_ms = self.retry_config.initial_backoff_ms;
243295

244296
loop {
245-
match self.public_client.get(&path).await {
246-
Ok(response) => {
297+
// Apply timeout to the get operation (non-WASM only)
298+
#[cfg(not(target_arch = "wasm32"))]
299+
let get_result =
300+
tokio::time::timeout(timeout_duration, self.public_client.get(&path)).await;
301+
302+
#[cfg(target_arch = "wasm32")]
303+
let get_result = Ok(self.public_client.get(&path).await);
304+
305+
match get_result {
306+
#[cfg(not(target_arch = "wasm32"))]
307+
Err(_elapsed) => {
308+
// Timeout on get operation
309+
retry_attempt += 1;
310+
if retry_attempt >= self.retry_config.max_retries {
311+
return Err(NoiseError::Timeout(format!(
312+
"Get operation timed out after {}ms ({} attempts)",
313+
self.retry_config.operation_timeout_ms, retry_attempt
314+
)));
315+
}
316+
// Backoff and retry
317+
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
318+
backoff_ms = (backoff_ms * 2).min(self.retry_config.max_backoff_ms);
319+
continue;
320+
}
321+
Ok(Ok(response)) => {
247322
if response.status().is_success() {
248-
let ciphertext = response.bytes().await.map_err(|e| {
249-
NoiseError::Network(format!("Failed to read bytes: {:?}", e))
250-
})?;
323+
// Apply timeout to reading bytes (non-WASM only)
324+
#[cfg(not(target_arch = "wasm32"))]
325+
let bytes_result =
326+
tokio::time::timeout(timeout_duration, response.bytes()).await;
327+
328+
#[cfg(target_arch = "wasm32")]
329+
let bytes_result = Ok(response.bytes().await);
330+
331+
let ciphertext = match bytes_result {
332+
#[cfg(not(target_arch = "wasm32"))]
333+
Err(_elapsed) => {
334+
return Err(NoiseError::Timeout(format!(
335+
"Reading response bytes timed out after {}ms",
336+
self.retry_config.operation_timeout_ms
337+
)));
338+
}
339+
Ok(Ok(bytes)) => bytes.to_vec(),
340+
Ok(Err(e)) => {
341+
return Err(NoiseError::Network(format!(
342+
"Failed to read bytes: {:?}",
343+
e
344+
)));
345+
}
346+
};
251347

252348
let plaintext = self.noise_link.decrypt(&ciphertext).map_err(|e| {
253349
NoiseError::Decryption(format!("Failed to decrypt: {:?}", e))
@@ -279,7 +375,7 @@ impl StorageBackedMessaging {
279375
)));
280376
}
281377
}
282-
Err(_e) if retry_attempt < self.retry_config.max_retries => {
378+
Ok(Err(_e)) if retry_attempt < self.retry_config.max_retries => {
283379
// Network error - retry
284380
retry_attempt += 1;
285381

@@ -289,7 +385,7 @@ impl StorageBackedMessaging {
289385
backoff_ms = (backoff_ms * 2).min(self.retry_config.max_backoff_ms);
290386
continue;
291387
}
292-
Err(e) => {
388+
Ok(Err(e)) => {
293389
// Exhausted retries
294390
return Err(NoiseError::Network(format!(
295391
"Network error after {} retries: {:?}",
@@ -303,13 +399,34 @@ impl StorageBackedMessaging {
303399
Ok(messages)
304400
}
305401

402+
/// Peek to estimate if there are pending messages.
403+
///
404+
/// Returns 1 if at least one message exists, 0 otherwise.
405+
/// This does not guarantee an accurate count without full scanning.
406+
///
407+
/// # WASM Limitations
408+
///
409+
/// On WASM targets, timeouts are not enforced.
306410
pub async fn peek_message_count(&self) -> Result<usize, NoiseError> {
307-
// Estimate by checking if next message exists
308-
// This is just a peek, doesn't guarantee total count without scanning
309411
let path = format!("{}/msg_{}", self.read_path, self.read_counter);
310-
match self.public_client.get(&path).await {
311-
Ok(r) if r.status().is_success() => Ok(1), // At least 1
312-
_ => Ok(0),
412+
413+
#[cfg(not(target_arch = "wasm32"))]
414+
let timeout_duration = Duration::from_millis(self.retry_config.operation_timeout_ms);
415+
416+
#[cfg(not(target_arch = "wasm32"))]
417+
let result = tokio::time::timeout(timeout_duration, self.public_client.get(&path)).await;
418+
419+
#[cfg(target_arch = "wasm32")]
420+
let result = Ok(self.public_client.get(&path).await);
421+
422+
match result {
423+
#[cfg(not(target_arch = "wasm32"))]
424+
Err(_elapsed) => Err(NoiseError::Timeout(format!(
425+
"Peek operation timed out after {}ms",
426+
self.retry_config.operation_timeout_ms
427+
))),
428+
Ok(Ok(r)) if r.status().is_success() => Ok(1), // At least 1
429+
Ok(_) => Ok(0),
313430
}
314431
}
315432
}

0 commit comments

Comments
 (0)