Skip to content

Commit ed5ebe0

Browse files
committed
fix(download): Limit retry attempts on download partial responses
Add a maximum retry limit when retrying after failing due to network error when downloading response chunks. Notably, this is a separate limit from the backoff policy used when initially connecting to the server, and which doesn't have a single number of attempts, but rather a total time limit. This new limit is intended to prevent infinite retry loops in cases where the server is consistently failing to deliver the full response, for example due to a misconfiguration or a bug. Signed-off-by: Marcel Guzik <marcel.guzik@cumulocity.com>
1 parent 5a8d94d commit ed5ebe0

File tree

2 files changed

+127
-5
lines changed

2 files changed

+127
-5
lines changed

crates/common/download/src/download.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ impl Downloader {
198198
file: &mut File,
199199
mut prev_response: Response,
200200
) -> Result<(), DownloadError> {
201-
loop {
201+
let mut last_result = Ok(());
202+
for _ in 0..4 {
202203
let request_offset = next_request_offset(&prev_response, file)?;
203204
let mut response = self.request_range_from(url, request_offset).await?;
204205
let offset = match partial_response::response_range_start(&response, &prev_response)? {
@@ -218,10 +219,14 @@ impl Downloader {
218219
}
219220

220221
match save_chunks_to_file_at(&mut response, file, offset).await {
221-
Ok(()) => break,
222+
Ok(()) => {
223+
last_result = Ok(());
224+
break;
225+
}
222226

223227
Err(SaveChunksError::Network(err)) => {
224228
warn!("Error while downloading response: {err}.\nRetrying...");
229+
last_result = Err(DownloadError::Request(err));
225230
}
226231

227232
Err(SaveChunksError::Io(err)) => {
@@ -230,11 +235,11 @@ impl Downloader {
230235
context: "Error while saving to file".to_string(),
231236
})
232237
}
233-
}
238+
};
234239
prev_response = response;
235240
}
236241

237-
Ok(())
242+
last_result
238243
}
239244

240245
/// Returns the filename.

crates/common/download/src/download/tests/partial_response.rs

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::*;
22

3+
use std::sync::atomic::AtomicUsize;
4+
use std::sync::atomic::Ordering;
35
use tokio::io::AsyncBufReadExt;
46
use tokio::io::AsyncWriteExt;
57
use tokio::io::BufReader;
@@ -63,7 +65,13 @@ async fn resume_download_when_disconnected() {
6365
let body = &file[start..next];
6466

6567
let size = body.len();
66-
let msg = format!("{header}\r\n{size}\r\n{body}\r\n");
68+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n");
69+
// if this is the last chunk, send also terminating 0-length chunk
70+
let msg = if next == file.len() {
71+
format!("{msg}0\r\n\r\n")
72+
} else {
73+
msg
74+
};
6775
debug!("sending message = {msg}");
6876
writer.write_all(msg.as_bytes()).await.unwrap();
6977
writer.flush().await.unwrap();
@@ -330,3 +338,112 @@ async fn resumed_download_doesnt_leave_leftovers() {
330338

331339
server_task.abort();
332340
}
341+
342+
#[tokio::test]
343+
async fn resume_max_5_times() {
344+
let request_count = Arc::new(AtomicUsize::new(0));
345+
let rc = request_count.clone();
346+
347+
let listener = TcpListener::bind("localhost:0").await.unwrap();
348+
let port = listener.local_addr().unwrap().port();
349+
let server_task = tokio::spawn(async move {
350+
while let Ok((mut stream, _addr)) = listener.accept().await {
351+
let response_task = async move {
352+
let (_, mut writer) = stream.split();
353+
// Always respond with only first chunk, never completing the response, triggering retries
354+
let header = "\
355+
HTTP/1.1 200 OK\r\n\
356+
transfer-encoding: chunked\r\n\
357+
connection: close\r\n\
358+
content-type: application/octet-stream\r\n\
359+
accept-ranges: bytes\r\n";
360+
361+
let body = "AAAA";
362+
let msg = format!("{header}\r\n4\r\n{body}\r\n");
363+
writer.write_all(msg.as_bytes()).await.unwrap();
364+
writer.flush().await.unwrap();
365+
};
366+
tokio::spawn(response_task);
367+
rc.fetch_add(1, Ordering::SeqCst);
368+
}
369+
});
370+
371+
// Wait until task binds a listener on the TCP port
372+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
373+
374+
let tmpdir = TempDir::new().unwrap();
375+
let target_path = tmpdir.path().join("partial_download");
376+
377+
let downloader = Downloader::new(target_path, None, CloudHttpConfig::test_value());
378+
let url = DownloadInfo::new(&format!("http://localhost:{port}/"));
379+
380+
let err = downloader.download(&url).await.unwrap_err();
381+
assert!(matches!(err, DownloadError::Request(_)));
382+
assert!(err.to_string().contains("error decoding response body"));
383+
384+
downloader.cleanup().await.unwrap();
385+
386+
server_task.abort();
387+
388+
assert_eq!(request_count.load(Ordering::SeqCst), 5);
389+
}
390+
391+
// If we succeed before max retries, we should not do more requests.
392+
#[tokio::test]
393+
async fn only_retry_until_success() {
394+
let file = "AAAABBBBCCCCDDDD";
395+
let request_count = Arc::new(AtomicUsize::new(0));
396+
let rc = request_count.clone();
397+
398+
let listener = TcpListener::bind("localhost:0").await.unwrap();
399+
let port = listener.local_addr().unwrap().port();
400+
let server_task = tokio::spawn(async move {
401+
while let Ok((mut stream, _addr)) = listener.accept().await {
402+
let rc_num = rc.load(Ordering::SeqCst);
403+
let response_task = async move {
404+
let (_, mut writer) = stream.split();
405+
// Always respond with only first chunk, never completing the response, triggering retries
406+
let header = "\
407+
HTTP/1.1 200 OK\r\n\
408+
transfer-encoding: chunked\r\n\
409+
connection: close\r\n\
410+
content-type: application/octet-stream\r\n\
411+
accept-ranges: bytes\r\n";
412+
413+
// On the 2nd request, send the full response, should trigger only 1 retry
414+
let msg = if rc_num == 0 {
415+
let body = "AAAA";
416+
format!("{header}\r\n4\r\n{body}\r\n")
417+
} else {
418+
let body = file;
419+
let len = file.len();
420+
format!("{header}\r\n{len:x}\r\n{body}\r\n0\r\n\r\n")
421+
};
422+
423+
writer.write_all(msg.as_bytes()).await.unwrap();
424+
writer.flush().await.unwrap();
425+
};
426+
tokio::spawn(response_task);
427+
rc.fetch_add(1, Ordering::SeqCst);
428+
}
429+
});
430+
431+
// Wait until task binds a listener on the TCP port
432+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
433+
434+
let tmpdir = TempDir::new().unwrap();
435+
let target_path = tmpdir.path().join("partial_download");
436+
437+
let downloader = Downloader::new(target_path, None, CloudHttpConfig::test_value());
438+
let url = DownloadInfo::new(&format!("http://localhost:{port}/"));
439+
440+
downloader.download(&url).await.unwrap();
441+
let saved_file = std::fs::read_to_string(downloader.filename()).unwrap();
442+
assert_eq!(saved_file, file);
443+
444+
downloader.cleanup().await.unwrap();
445+
446+
server_task.abort();
447+
448+
assert_eq!(request_count.load(Ordering::SeqCst), 2);
449+
}

0 commit comments

Comments
 (0)