Skip to content

Commit 6b3b3a4

Browse files
committed
fix(download): verify etags for resumed requests
Signed-off-by: Marcel Guzik <[email protected]>
1 parent 01701c9 commit 6b3b3a4

File tree

2 files changed

+145
-2
lines changed

2 files changed

+145
-2
lines changed

crates/common/download/src/download.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use anyhow::anyhow;
55
use backoff::future::retry_notify;
66
use backoff::ExponentialBackoff;
77
use certificate::CloudHttpConfig;
8+
use hyper::header;
89
use log::debug;
910
use log::info;
1011
use log::warn;
@@ -199,6 +200,12 @@ impl Downloader {
199200
loop {
200201
let offset = next_request_offset(&prev_response, file);
201202
let mut response = self.request_range_from(url, offset).await?;
203+
// etags in current and previous request must match to resume
204+
if !etags_matching(&prev_response, &response) {
205+
file.seek(SeekFrom::Start(0)).unwrap();
206+
prev_response = response;
207+
continue;
208+
}
202209
let offset = partial_response::response_range_start(&response)?;
203210

204211
if offset != 0 {
@@ -211,9 +218,7 @@ impl Downloader {
211218
Ok(()) => break,
212219

213220
Err(SaveChunksError::Network(err)) => {
214-
prev_response = response;
215221
warn!("Error while downloading response: {err}.\nRetrying...");
216-
continue;
217222
}
218223

219224
Err(SaveChunksError::Io(err)) => {
@@ -223,6 +228,7 @@ impl Downloader {
223228
})
224229
}
225230
}
231+
prev_response = response;
226232
}
227233

228234
Ok(())
@@ -352,6 +358,37 @@ impl Downloader {
352358
}
353359
}
354360

361+
fn etags_matching(prev_response: &Response, response: &Response) -> bool {
362+
if let Some(etag) = response
363+
.headers()
364+
.get(header::ETAG)
365+
.and_then(|h| h.to_str().ok())
366+
{
367+
// TODO: handle optional backslashes
368+
if etag.starts_with("W/") {
369+
// validator is weak, but in range requests tags must match using strong comparison
370+
// https://www.rfc-editor.org/rfc/rfc9110#section-13.1.5-12.1
371+
return false;
372+
}
373+
374+
let Some(prev_etag) = prev_response
375+
.headers()
376+
.get(header::ETAG)
377+
.and_then(|h| h.to_str().ok())
378+
else {
379+
// previous request didn't have etag and this does or vice versa, abort
380+
return false;
381+
};
382+
383+
if etag != prev_etag {
384+
// etags don't match, abort
385+
return false;
386+
}
387+
}
388+
// etags match
389+
true
390+
}
391+
355392
/// Decides whether HTTP request error is retryable.
356393
fn reqwest_err_to_backoff(err: reqwest::Error) -> backoff::Error<reqwest::Error> {
357394
if err.is_timeout() || err.is_connect() {

crates/common/download/src/download/tests/partial_response.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,109 @@ async fn resume_download_when_disconnected() {
102102

103103
server_task.abort();
104104
}
105+
106+
/// If after retrying ETag of the resource is different, we should download it
107+
/// from scratch again.
108+
#[tokio::test]
109+
async fn resume_download_with_etag_changed() {
110+
let file_v1 = "AAAABBBBCCCCDDDD";
111+
let file_v2 = "XXXXYYYYZZZZWWWW";
112+
let chunk_size = 4;
113+
114+
let listener = TcpListener::bind("localhost:0").await.unwrap();
115+
let port = listener.local_addr().unwrap().port();
116+
117+
// server should do 3 things:
118+
// - only send a portion of the first request
119+
// - after the first request update the resource
120+
// - serve 2nd request normally (but we expect it to be a range request)
121+
let server_task = tokio::spawn(async move {
122+
let mut request_count = 0;
123+
while let Ok((mut stream, _addr)) = listener.accept().await {
124+
let response_task = async move {
125+
let (reader, mut writer) = stream.split();
126+
let mut lines = BufReader::new(reader).lines();
127+
let mut range: Option<std::ops::Range<usize>> = None;
128+
129+
// We got an HTTP request, read the lines of the request
130+
while let Ok(Some(line)) = lines.next_line().await {
131+
if line.to_ascii_lowercase().contains("range:") {
132+
let (_, bytes) = line.split_once('=').unwrap();
133+
let (start, end) = bytes.split_once('-').unwrap();
134+
let start = start.parse().unwrap_or(0);
135+
let end = end.parse().unwrap_or(file_v2.len());
136+
range = Some(start..end)
137+
}
138+
// On `\r\n\r\n` (empty line) stop reading the request
139+
// and start responding
140+
if line.is_empty() {
141+
break;
142+
}
143+
}
144+
145+
let file = if request_count == 0 { file_v1 } else { file_v2 };
146+
let etag = if request_count == 0 {
147+
"v1-initial"
148+
} else {
149+
"v2-changed"
150+
};
151+
152+
if let Some(range) = range {
153+
// Return range for both first and subsequent requests
154+
let start = range.start;
155+
let end = range.end.min(file.len());
156+
let header = format!(
157+
"HTTP/1.1 206 Partial Content\r\n\
158+
transfer-encoding: chunked\r\n\
159+
connection: close\r\n\
160+
content-type: application/octet-stream\r\n\
161+
content-range: bytes {start}-{end}/*\r\n\
162+
accept-ranges: bytes\r\n\
163+
etag: \"{etag}\"\r\n"
164+
);
165+
let body = &file[start..end];
166+
let size = body.len();
167+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n0\r\n\r\n");
168+
writer.write_all(msg.as_bytes()).await.unwrap();
169+
writer.flush().await.unwrap();
170+
} else {
171+
let header = format!(
172+
"HTTP/1.1 200 OK\r\n\
173+
transfer-encoding: chunked\r\n\
174+
connection: close\r\n\
175+
content-type: application/octet-stream\r\n\
176+
accept-ranges: bytes\r\n\
177+
etag: \"{etag}\"\r\n"
178+
);
179+
180+
let body = &file[0..chunk_size];
181+
let size = body.len();
182+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n");
183+
writer.write_all(msg.as_bytes()).await.unwrap();
184+
writer.flush().await.unwrap();
185+
// Connection drops here without sending final chunk
186+
}
187+
};
188+
request_count += 1;
189+
tokio::spawn(response_task);
190+
}
191+
});
192+
193+
// Wait until task binds a listener on the TCP port
194+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
195+
196+
let tmpdir = TempDir::new().unwrap();
197+
let target_path = tmpdir.path().join("partial_download_etag");
198+
199+
let downloader = Downloader::new(target_path, None, CloudHttpConfig::test_value());
200+
let url = DownloadInfo::new(&format!("http://localhost:{port}/"));
201+
202+
downloader.download(&url).await.unwrap();
203+
let saved_file = std::fs::read_to_string(downloader.filename()).unwrap();
204+
// Should have the complete new file content since ETag changed
205+
assert_eq!(saved_file, file_v2);
206+
207+
downloader.cleanup().await.unwrap();
208+
209+
server_task.abort();
210+
}

0 commit comments

Comments
 (0)