Skip to content

Commit f19a5d0

Browse files
committed
fix(download): verify etags for resumed requests
Signed-off-by: Marcel Guzik <[email protected]>
1 parent 7d4a43d commit f19a5d0

File tree

5 files changed

+228
-19
lines changed

5 files changed

+228
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/common/download/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ repository = { workspace = true }
1313
anyhow = { workspace = true, features = ["backtrace"] }
1414
backoff = { workspace = true }
1515
certificate = { workspace = true, features = ["reqwest"] }
16+
http = { workspace = true }
1617
hyper = { workspace = true }
1718
log = { workspace = true }
1819
nix = { workspace = true }

crates/common/download/src/download.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod partial_response;
2+
use crate::download::partial_response::PartialResponse;
23
use crate::error::DownloadError;
34
use crate::error::ErrContext;
45
use anyhow::anyhow;
@@ -187,7 +188,7 @@ impl Downloader {
187188

188189
/// If interrupted, continues ongoing download.
189190
///
190-
/// If the server supports is, a range request is used to download only the
191+
/// If the server supports it, a range request is used to download only the
191192
/// remaining range of the file. Otherwise, progress is restarted and we
192193
/// download full range of the file again.
193194
async fn download_continue(
@@ -197,9 +198,17 @@ impl Downloader {
197198
mut prev_response: Response,
198199
) -> Result<(), DownloadError> {
199200
loop {
200-
let offset = next_request_offset(&prev_response, file);
201-
let mut response = self.request_range_from(url, offset).await?;
202-
let offset = partial_response::response_range_start(&response)?;
201+
let request_offset = next_request_offset(&prev_response, file)?;
202+
let mut response = self.request_range_from(url, request_offset).await?;
203+
let offset = match partial_response::response_range_start(&response, &prev_response)? {
204+
PartialResponse::CompleteContent => 0,
205+
PartialResponse::PartialContent(pos) => pos,
206+
PartialResponse::ResourceModified => {
207+
file.seek(SeekFrom::Start(0))
208+
.context("failed to seek in file".to_string())?;
209+
continue;
210+
}
211+
};
203212

204213
if offset != 0 {
205214
info!("Resuming file download at position={offset}");
@@ -211,9 +220,7 @@ impl Downloader {
211220
Ok(()) => break,
212221

213222
Err(SaveChunksError::Network(err)) => {
214-
prev_response = response;
215223
warn!("Error while downloading response: {err}.\nRetrying...");
216-
continue;
217224
}
218225

219226
Err(SaveChunksError::Io(err)) => {
@@ -223,6 +230,7 @@ impl Downloader {
223230
})
224231
}
225232
}
233+
prev_response = response;
226234
}
227235

228236
Ok(())
@@ -424,20 +432,22 @@ pub fn try_pre_allocate_space(
424432
Ok(())
425433
}
426434

427-
fn next_request_offset(prev_response: &Response, file: &mut File) -> u64 {
435+
fn next_request_offset(prev_response: &Response, file: &mut File) -> Result<u64, DownloadError> {
428436
use hyper::header;
429437
use hyper::StatusCode;
430438
use std::io::Seek;
431-
let pos = file.stream_position().unwrap();
439+
let pos = file
440+
.stream_position()
441+
.context("failed to get cursor position".to_string())?;
432442
if prev_response.status() == StatusCode::PARTIAL_CONTENT
433443
|| prev_response
434444
.headers()
435445
.get(header::ACCEPT_RANGES)
436446
.is_some_and(|unit| unit == "bytes")
437447
{
438-
pos
448+
Ok(pos)
439449
} else {
440-
0
450+
Ok(0)
441451
}
442452
}
443453

crates/common/download/src/download/partial_response.rs

Lines changed: 100 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,98 @@
22
33
use reqwest::header;
44
use reqwest::header::HeaderValue;
5+
use reqwest::Response;
56
use reqwest::StatusCode;
67

8+
pub(super) enum PartialResponse {
9+
/// Server returned a partial content response starting at given position
10+
PartialContent(u64),
11+
12+
/// Server returned regular OK response, resume writing at 0
13+
CompleteContent,
14+
15+
/// Server returned partial content but resource was modified, request needs to be retried
16+
ResourceModified,
17+
}
18+
719
/// Returns the position of the partial response in the resource.
820
///
921
/// When making a partial request, a server can return a different range than
1022
/// the one we asked for, in which case we need to extract the position from the
1123
/// Content-Range header. The server could also just ignore the Range header and
1224
/// respond with 200 OK, in which case we need to download the entire resource
1325
/// all over again.
14-
pub fn response_range_start(response: &reqwest::Response) -> Result<u64, InvalidResponseError> {
15-
let chunk_pos = match response.status() {
26+
pub(super) fn response_range_start(
27+
response: &reqwest::Response,
28+
prev_response: &Response,
29+
) -> Result<PartialResponse, InvalidResponseError> {
30+
match response.status() {
1631
// Complete response, seek to the beginning of the file
17-
StatusCode::OK => 0,
32+
StatusCode::OK => Ok(PartialResponse::CompleteContent),
1833

1934
// Partial response, the range might be different from what we
2035
// requested, so we need to parse it. Because we only request a single
2136
// range from the current position to the end of the document, we can
2237
// ignore multipart/byteranges media type.
23-
StatusCode::PARTIAL_CONTENT => partial_response_start_range(response)?,
38+
StatusCode::PARTIAL_CONTENT => {
39+
if was_resource_modified(response, prev_response) {
40+
return Ok(PartialResponse::ResourceModified);
41+
}
42+
let pos = partial_response_start_range(response)?;
43+
Ok(PartialResponse::PartialContent(pos))
44+
}
2445

2546
// We don't expect to receive any other 200-299 status code, but if we
2647
// do, treat it the same as OK
27-
status_code if status_code.is_success() => 0,
48+
status_code if status_code.is_success() => Ok(PartialResponse::CompleteContent),
49+
50+
status_code => Err(InvalidResponseError::UnexpectedStatus(status_code)),
51+
}
52+
}
53+
54+
/// Checks if the resource was modified between the current and previous response.
55+
///
56+
/// If the resource was updated, we should restart download and request full range of the new
57+
/// resource. Otherwise, a partial request can be used to resume the download.
58+
fn was_resource_modified(response: &Response, prev_response: &Response) -> bool {
59+
if response.status() != hyper::StatusCode::PARTIAL_CONTENT {
60+
// not using a partial request, don't care if it's modified or not
61+
return false;
62+
}
63+
64+
// etags in current and previous request must match
65+
let etag = response
66+
.headers()
67+
.get(header::ETAG)
68+
.and_then(|h| h.to_str().ok());
69+
let prev_etag = prev_response
70+
.headers()
71+
.get(header::ETAG)
72+
.and_then(|h| h.to_str().ok());
2873

29-
status_code => {
30-
return Err(InvalidResponseError::UnexpectedStatus(status_code));
74+
match (etag, prev_etag) {
75+
(None, None) => {
76+
// no etags in either request, assume resource is unchanged
77+
false
3178
}
32-
};
33-
Ok(chunk_pos)
79+
(None, Some(_)) | (Some(_), None) => {
80+
// previous request didn't have etag and this does or vice versa, abort
81+
true
82+
}
83+
(Some(etag), Some(prev_etag)) => {
84+
// Examples:
85+
// ETag: "xyzzy"
86+
// ETag: W/"xyzzy"
87+
// ETag: ""
88+
if etag.starts_with("W/") {
89+
// validator is weak, but in range requests tags must match using strong comparison
90+
// https://www.rfc-editor.org/rfc/rfc9110#entity.tag.comparison
91+
return true;
92+
}
93+
94+
etag != prev_etag
95+
}
96+
}
3497
}
3598

3699
#[derive(Debug, thiserror::Error)]
@@ -88,3 +151,31 @@ pub struct ContentRangeParseError {
88151
reason: &'static str,
89152
value: header::HeaderValue,
90153
}
154+
155+
#[cfg(test)]
156+
mod tests {
157+
use super::*;
158+
159+
#[test_case::test_case(Some(r#""xyzzy""#), Some(r#""xyzzy""#), false)]
160+
#[test_case::test_case(Some(r#"W/"xyzzy""#), Some(r#""xyzzy""#), true)]
161+
#[test_case::test_case(Some(r#""xyzzy""#), Some(r#"W/"xyzzy""#), true)]
162+
#[test_case::test_case(Some(r#""xyzzy1""#), Some(r#""xyzzy2""#), true)]
163+
#[test_case::test_case(None, None, false)]
164+
#[test_case::test_case(Some(r#""xyzzy1""#), None, true)]
165+
#[test_case::test_case(None, Some(r#""xyzzy2""#), true)]
166+
fn verifies_etags(etag1: Option<&'static str>, etag2: Option<&'static str>, modified: bool) {
167+
let mut response1 = http::Response::builder().status(StatusCode::PARTIAL_CONTENT);
168+
if let Some(etag) = etag1 {
169+
response1 = response1.header(http::header::ETAG, etag);
170+
}
171+
let response1 = response1.body("").unwrap().into();
172+
173+
let mut response2 = http::Response::builder().status(StatusCode::PARTIAL_CONTENT);
174+
if let Some(etag) = etag2 {
175+
response2 = response2.header(http::header::ETAG, etag);
176+
}
177+
let response2 = response2.body("").unwrap().into();
178+
179+
assert_eq!(was_resource_modified(&response1, &response2), modified);
180+
}
181+
}

crates/common/download/src/download/tests/partial_response.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,109 @@ async fn resume_download_when_disconnected() {
102102

103103
server_task.abort();
104104
}
105+
106+
/// If after retrying ETag of the resource is different, we should download it
107+
/// from scratch again.
108+
#[tokio::test]
109+
async fn resume_download_with_etag_changed() {
110+
let file_v1 = "AAAABBBBCCCCDDDD";
111+
let file_v2 = "XXXXYYYYZZZZWWWW";
112+
let chunk_size = 4;
113+
114+
let listener = TcpListener::bind("localhost:0").await.unwrap();
115+
let port = listener.local_addr().unwrap().port();
116+
117+
// server should do 3 things:
118+
// - only send a portion of the first request
119+
// - after the first request update the resource
120+
// - serve 2nd request normally (but we expect it to be a range request)
121+
let server_task = tokio::spawn(async move {
122+
let mut request_count = 0;
123+
while let Ok((mut stream, _addr)) = listener.accept().await {
124+
let response_task = async move {
125+
let (reader, mut writer) = stream.split();
126+
let mut lines = BufReader::new(reader).lines();
127+
let mut range: Option<std::ops::Range<usize>> = None;
128+
129+
// We got an HTTP request, read the lines of the request
130+
while let Ok(Some(line)) = lines.next_line().await {
131+
if line.to_ascii_lowercase().contains("range:") {
132+
let (_, bytes) = line.split_once('=').unwrap();
133+
let (start, end) = bytes.split_once('-').unwrap();
134+
let start = start.parse().unwrap_or(0);
135+
let end = end.parse().unwrap_or(file_v2.len());
136+
range = Some(start..end)
137+
}
138+
// On `\r\n\r\n` (empty line) stop reading the request
139+
// and start responding
140+
if line.is_empty() {
141+
break;
142+
}
143+
}
144+
145+
let file = if request_count == 0 { file_v1 } else { file_v2 };
146+
let etag = if request_count == 0 {
147+
"v1-initial"
148+
} else {
149+
"v2-changed"
150+
};
151+
152+
if let Some(range) = range {
153+
// Return range for both first and subsequent requests
154+
let start = range.start;
155+
let end = range.end.min(file.len());
156+
let header = format!(
157+
"HTTP/1.1 206 Partial Content\r\n\
158+
transfer-encoding: chunked\r\n\
159+
connection: close\r\n\
160+
content-type: application/octet-stream\r\n\
161+
content-range: bytes {start}-{end}/*\r\n\
162+
accept-ranges: bytes\r\n\
163+
etag: \"{etag}\"\r\n"
164+
);
165+
let body = &file[start..end];
166+
let size = body.len();
167+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n0\r\n\r\n");
168+
writer.write_all(msg.as_bytes()).await.unwrap();
169+
writer.flush().await.unwrap();
170+
} else {
171+
let header = format!(
172+
"HTTP/1.1 200 OK\r\n\
173+
transfer-encoding: chunked\r\n\
174+
connection: close\r\n\
175+
content-type: application/octet-stream\r\n\
176+
accept-ranges: bytes\r\n\
177+
etag: \"{etag}\"\r\n"
178+
);
179+
180+
let body = &file[0..chunk_size];
181+
let size = body.len();
182+
let msg = format!("{header}\r\n{size:x}\r\n{body}\r\n");
183+
writer.write_all(msg.as_bytes()).await.unwrap();
184+
writer.flush().await.unwrap();
185+
// Connection drops here without sending final chunk
186+
}
187+
};
188+
request_count += 1;
189+
tokio::spawn(response_task);
190+
}
191+
});
192+
193+
// Wait until task binds a listener on the TCP port
194+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
195+
196+
let tmpdir = TempDir::new().unwrap();
197+
let target_path = tmpdir.path().join("partial_download_etag");
198+
199+
let downloader = Downloader::new(target_path, None, CloudHttpConfig::test_value());
200+
let url = DownloadInfo::new(&format!("http://localhost:{port}/"));
201+
202+
downloader.download(&url).await.unwrap();
203+
let saved_file = std::fs::read_to_string(downloader.filename()).unwrap();
204+
// Should have the complete new file content since ETag changed
205+
assert_eq!(saved_file, file_v2);
206+
207+
downloader.cleanup().await.unwrap();
208+
209+
server_task.abort();
210+
}

0 commit comments

Comments
 (0)