Skip to content

Commit 7187df8

Browse files
committed
fix(download): verify etags for resumed requests
Signed-off-by: Marcel Guzik <[email protected]>
1 parent 8afbedd commit 7187df8

File tree

5 files changed

+179
-2
lines changed

5 files changed

+179
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/download/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository = { workspace = true }
1313
anyhow = { workspace = true, features = ["backtrace"] }
1414
backoff = { workspace = true }
1515
certificate = { workspace = true, features = ["reqwest"] }
16+
http.workspace = true
1617
hyper = { workspace = true }
1718
log = { workspace = true }
1819
nix = { workspace = true }

crates/common/download/src/download.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use anyhow::anyhow;
55
use backoff::future::retry_notify;
66
use backoff::ExponentialBackoff;
77
use certificate::CloudHttpConfig;
8+
use hyper::header;
89
use log::debug;
910
use log::info;
1011
use log::warn;
@@ -199,6 +200,11 @@ impl Downloader {
199200
loop {
200201
let offset = next_request_offset(&prev_response, file);
201202
let mut response = self.request_range_from(url, offset).await?;
203+
if was_resource_modified(&response, &prev_response) {
204+
file.seek(SeekFrom::Start(0)).unwrap();
205+
prev_response = response;
206+
continue;
207+
}
202208
let offset = partial_response::response_range_start(&response)?;
203209

204210
if offset != 0 {
@@ -211,9 +217,7 @@ impl Downloader {
211217
Ok(()) => break,
212218

213219
Err(SaveChunksError::Network(err)) => {
214-
prev_response = response;
215220
warn!("Error while downloading response: {err}.\nRetrying...");
216-
continue;
217221
}
218222

219223
Err(SaveChunksError::Io(err)) => {
@@ -223,6 +227,7 @@ impl Downloader {
223227
})
224228
}
225229
}
230+
prev_response = response;
226231
}
227232

228233
Ok(())
@@ -352,6 +357,46 @@ impl Downloader {
352357
}
353358
}
354359

360+
/// Checks if the resource was modified between the current and previous response.
361+
///
362+
/// If the resource was updated, we should restart download and request full range of the new
363+
/// resource. Otherwise, a partial request can be used to resume the download.
364+
fn was_resource_modified(response: &Response, prev_response: &Response) -> bool {
365+
// etags in current and previous request must match
366+
let etag = response
367+
.headers()
368+
.get(header::ETAG)
369+
.and_then(|h| h.to_str().ok());
370+
let prev_etag = prev_response
371+
.headers()
372+
.get(header::ETAG)
373+
.and_then(|h| h.to_str().ok());
374+
375+
match (etag, prev_etag) {
376+
(None, None) => {
377+
// no etags in either request, assume resource is unchanged
378+
false
379+
}
380+
(None, Some(_)) | (Some(_), None) => {
381+
// previous request didn't have etag and this does or vice versa, abort
382+
true
383+
}
384+
(Some(etag), Some(prev_etag)) => {
385+
// Examples:
386+
// ETag: "xyzzy"
387+
// ETag: W/"xyzzy"
388+
// ETag: ""
389+
if etag.starts_with("W/") {
390+
// validator is weak, but in range requests tags must match using strong comparison
391+
// https://www.rfc-editor.org/rfc/rfc9110#entity.tag.comparison
392+
return true;
393+
}
394+
395+
etag != prev_etag
396+
}
397+
}
398+
}
399+
355400
/// Decides whether HTTP request error is retryable.
356401
fn reqwest_err_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
357402
if err.is_timeout() || err.is_connect() {

crates/common/download/src/download/tests.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::*;
22
use axum::Router;
3+
use http::StatusCode;
34
use hyper::header::AUTHORIZATION;
45
use rustls::pki_types::pem::PemObject;
56
use rustls::pki_types::PrivateKeyDer;
@@ -433,6 +434,29 @@ async fn downloader_error_shows_certificate_required_error_when_appropriate() {
433434
assert!(dbg!(format!("{err:#}")).contains("received fatal alert: CertificateRequired"));
434435
}
435436

437+
#[test_case::test_case(Some(r#""xyzzy"#), Some(r#""xyzzy"#), false)]
438+
#[test_case::test_case(Some(r#"W/"xyzzy"#), Some(r#""xyzzy"#), true)]
439+
#[test_case::test_case(Some(r#""xyzzy"#), Some(r#"W/"xyzzy"#), true)]
440+
#[test_case::test_case(Some(r#""xyzzy1"#), Some(r#""xyzzy2"#), true)]
441+
#[test_case::test_case(None, None, false)]
442+
#[test_case::test_case(Some(r#""xyzzy1"#), None, true)]
443+
#[test_case::test_case(None, Some(r#""xyzzy2"#), true)]
444+
fn verifies_etags(etag1: Option<&'static str>, etag2: Option<&'static str>, modified: bool) {
445+
let mut response1 = http::Response::builder().status(StatusCode::PARTIAL_CONTENT);
446+
if let Some(etag) = etag1 {
447+
response1 = response1.header(http::header::ETAG, etag);
448+
}
449+
let response1 = response1.body("").unwrap().into();
450+
451+
let mut response2 = http::Response::builder().status(StatusCode::PARTIAL_CONTENT);
452+
if let Some(etag) = etag2 {
453+
response2 = response2.header(http::header::ETAG, etag);
454+
}
455+
let response2 = response2.body("").unwrap().into();
456+
457+
assert_eq!(was_resource_modified(&response1, &response2), modified);
458+
}
459+
436460
fn create_file_with_size(size: usize) -> Result<NamedTempFile, anyhow::Error> {
437461
let mut file = NamedTempFile::new().unwrap();
438462
let data: String = "Some data!".into();

crates/common/download/src/download/tests/partial_response.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,109 @@ async fn resume_download_when_disconnected() {
102102

103103
server_task.abort();
104104
}
105+
106+
/// If after retrying ETag of the resource is different, we should download it
107+
/// from scratch again.
108+
#[tokio::test]
109+
async fn resume_download_with_etag_changed() {
110+
let file_v1 = "AAAABBBBCCCCDDDD";
111+
let file_v2 = "XXXXYYYYZZZZWWWW";
112+
let chunk_size = 4;
113+
114+
let listener = TcpListener::bind("localhost:0").await.unwrap();
115+
let port = listener.local_addr().unwrap().port();
116+
117+
// server should do 3 things:
118+
// - only send a portion of the first request
119+
// - after the first request update the resource
120+
// - serve 2nd request normally (but we expect it to be a range request)
121+
let server_task = tokio::spawn(async move {
122+
let mut request_count = 0;
123+
while let Ok((mut stream, _addr)) = listener.accept().await {
124+
let response_task = async move {
125+
let (reader, mut writer) = stream.split();
126+
let mut lines = BufReader::new(reader).lines();
127+
let mut range: Option<std::ops::Range<usize>> = None;
128+
129+
// We got an HTTP request, read the lines of the request
130+
while let Ok(Some(line)) = lines.next_line().await {
131+
if line.to_ascii_lowercase().contains("range:") {
132+
let (_, bytes) = line.split_once('=').unwrap();
133+
let (start, end) = bytes.split_once('-').unwrap();
134+
let start = start.parse().unwrap_or(0);
135+
let end = end.parse().unwrap_or(file_v2.len());
136+
range = Some(start..end)
137+
}
138+
// On `\r\n\r\n` (empty line) stop reading the request
139+
// and start responding
140+
if line.is_empty() {
141+
break;
142+
}
143+
}
144+
145+
let file = if request_count == 0 { file_v1 } else { file_v2 };
146+
let etag = if request_count == 0 {
147+
"v1-initial"
148+
} else {
149+
"v2-changed"
150+
};
151+
152+
if let Some(range) = range {
153+
// Return range for both first and subsequent requests
154+
let start = range.start;
155+
let end = range.end.min(file.len());
156+
let header = format!(
157+
"HTTP/1.1 206 Partial Content\r\n\
158+
transfer-encoding: chunked\r\n\
159+
connection: close\r\n\
160+
content-type: application/octet-stream\r\n\
161+
content-range: bytes {start}-{end}/*\r\n\
162+
accept-ranges: bytes\r\n\
163+
etag: \"{etag}\"\r\n"
164+
);
165+
let body = &file[start..end];
166+
let size = body.len();
167+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n0\r\n\r\n");
168+
writer.write_all(msg.as_bytes()).await.unwrap();
169+
writer.flush().await.unwrap();
170+
} else {
171+
let header = format!(
172+
"HTTP/1.1 200 OK\r\n\
173+
transfer-encoding: chunked\r\n\
174+
connection: close\r\n\
175+
content-type: application/octet-stream\r\n\
176+
accept-ranges: bytes\r\n\
177+
etag: \"{etag}\"\r\n"
178+
);
179+
180+
let body = &file[0..chunk_size];
181+
let size = body.len();
182+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n");
183+
writer.write_all(msg.as_bytes()).await.unwrap();
184+
writer.flush().await.unwrap();
185+
// Connection drops here without sending final chunk
186+
}
187+
};
188+
request_count += 1;
189+
tokio::spawn(response_task);
190+
}
191+
});
192+
193+
// Wait until task binds a listener on the TCP port
194+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
195+
196+
let tmpdir = TempDir::new().unwrap();
197+
let target_path = tmpdir.path().join("partial_download_etag");
198+
199+
let downloader = Downloader::new(target_path, None, CloudHttpConfig::test_value());
200+
let url = DownloadInfo::new(&format!("http://localhost:{port}/"));
201+
202+
downloader.download(&url).await.unwrap();
203+
let saved_file = std::fs::read_to_string(downloader.filename()).unwrap();
204+
// Should have the complete new file content since ETag changed
205+
assert_eq!(saved_file, file_v2);
206+
207+
downloader.cleanup().await.unwrap();
208+
209+
server_task.abort();
210+
}

0 commit comments

Comments
 (0)