Skip to content

Commit 6bd1209

Browse files
committed
Add delays to network retries.
1 parent c38e050 commit 6bd1209

File tree

9 files changed

+527
-88
lines changed

9 files changed

+527
-88
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ os_info = "3.5.0"
5858
pasetors = { version = "0.6.4", features = ["v3", "paserk", "std", "serde"] }
5959
pathdiff = "0.2"
6060
pretty_env_logger = { version = "0.4", optional = true }
61+
rand = "0.8.5"
6162
rustfix = "0.6.0"
6263
semver = { version = "1.0.3", features = ["serde"] }
6364
serde = { version = "1.0.123", features = ["derive"] }

src/cargo/core/package.rs

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use crate::ops;
2828
use crate::util::config::PackageCacheLock;
2929
use crate::util::errors::{CargoResult, HttpNotSuccessful};
3030
use crate::util::interning::InternedString;
31-
use crate::util::network::retry::Retry;
31+
use crate::util::network::retry::{Retry, RetryResult};
32+
use crate::util::network::sleep::SleepTracker;
3233
use crate::util::{self, internal, Config, Progress, ProgressStyle};
3334

3435
pub const MANIFEST_PREAMBLE: &str = "\
@@ -319,6 +320,8 @@ pub struct Downloads<'a, 'cfg> {
319320
/// Set of packages currently being downloaded. This should stay in sync
320321
/// with `pending`.
321322
pending_ids: HashSet<PackageId>,
323+
/// Downloads that have failed and are waiting to retry again later.
324+
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
322325
/// The final result of each download. A pair `(token, result)`. This is a
323326
/// temporary holding area, needed because curl can report multiple
324327
/// downloads at once, but the main loop (`wait`) is written to only
@@ -442,6 +445,7 @@ impl<'cfg> PackageSet<'cfg> {
442445
next: 0,
443446
pending: HashMap::new(),
444447
pending_ids: HashSet::new(),
448+
sleeping: SleepTracker::new(),
445449
results: Vec::new(),
446450
progress: RefCell::new(Some(Progress::with_style(
447451
"Downloading",
@@ -800,7 +804,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
800804

801805
/// Returns the number of crates that are still downloading.
802806
pub fn remaining(&self) -> usize {
803-
self.pending.len()
807+
self.pending.len() + self.sleeping.len()
804808
}
805809

806810
/// Blocks the current thread waiting for a package to finish downloading.
@@ -831,51 +835,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
831835
let ret = {
832836
let timed_out = &dl.timed_out;
833837
let url = &dl.url;
834-
dl.retry
835-
.r#try(|| {
836-
if let Err(e) = result {
837-
// If this error is "aborted by callback" then that's
838-
// probably because our progress callback aborted due to
839-
// a timeout. We'll find out by looking at the
840-
// `timed_out` field, looking for a descriptive message.
841-
// If one is found we switch the error code (to ensure
842-
// it's flagged as spurious) and then attach our extra
843-
// information to the error.
844-
if !e.is_aborted_by_callback() {
845-
return Err(e.into());
846-
}
838+
dl.retry.r#try(|| {
839+
if let Err(e) = result {
840+
// If this error is "aborted by callback" then that's
841+
// probably because our progress callback aborted due to
842+
// a timeout. We'll find out by looking at the
843+
// `timed_out` field, looking for a descriptive message.
844+
// If one is found we switch the error code (to ensure
845+
// it's flagged as spurious) and then attach our extra
846+
// information to the error.
847+
if !e.is_aborted_by_callback() {
848+
return Err(e.into());
849+
}
847850

848-
return Err(match timed_out.replace(None) {
849-
Some(msg) => {
850-
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
851-
let mut err = curl::Error::new(code);
852-
err.set_extra(msg);
853-
err
854-
}
855-
None => e,
851+
return Err(match timed_out.replace(None) {
852+
Some(msg) => {
853+
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
854+
let mut err = curl::Error::new(code);
855+
err.set_extra(msg);
856+
err
856857
}
857-
.into());
858+
None => e,
858859
}
860+
.into());
861+
}
859862

860-
let code = handle.response_code()?;
861-
if code != 200 && code != 0 {
862-
let url = handle.effective_url()?.unwrap_or(url);
863-
return Err(HttpNotSuccessful {
864-
code,
865-
url: url.to_string(),
866-
body: data,
867-
}
868-
.into());
863+
let code = handle.response_code()?;
864+
if code != 200 && code != 0 {
865+
let url = handle.effective_url()?.unwrap_or(url);
866+
return Err(HttpNotSuccessful {
867+
code,
868+
url: url.to_string(),
869+
body: data,
869870
}
870-
Ok(data)
871-
})
872-
.with_context(|| format!("failed to download from `{}`", dl.url))?
871+
.into());
872+
}
873+
Ok(data)
874+
})
873875
};
874876
match ret {
875-
Some(data) => break (dl, data),
876-
None => {
877-
self.pending_ids.insert(dl.id);
878-
self.enqueue(dl, handle)?
877+
RetryResult::Success(data) => break (dl, data),
878+
RetryResult::Err(e) => {
879+
return Err(e.context(format!("failed to download from `{}`", dl.url)))
880+
}
881+
RetryResult::Retry(sleep) => {
882+
debug!("download retry {} for {sleep}ms", dl.url);
883+
self.sleeping.push(sleep, (dl, handle));
879884
}
880885
}
881886
};
@@ -963,6 +968,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
963968
// actually block waiting for I/O to happen, which we achieve with the
964969
// `wait` method on `multi`.
965970
loop {
971+
self.add_sleepers()?;
966972
let n = tls::set(self, || {
967973
self.set
968974
.multi
@@ -985,17 +991,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
985991
if let Some(pair) = results.pop() {
986992
break Ok(pair);
987993
}
988-
assert!(!self.pending.is_empty());
989-
let min_timeout = Duration::new(1, 0);
990-
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
991-
let timeout = timeout.min(min_timeout);
992-
self.set
993-
.multi
994-
.wait(&mut [], timeout)
995-
.with_context(|| "failed to wait on curl `Multi`")?;
994+
assert_ne!(self.remaining(), 0);
995+
if self.pending.is_empty() {
996+
let delay = self.sleeping.time_to_next().unwrap();
997+
debug!("sleeping main thread for {delay:?}");
998+
std::thread::sleep(delay);
999+
} else {
1000+
let min_timeout = Duration::new(1, 0);
1001+
let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout);
1002+
let timeout = timeout.min(min_timeout);
1003+
self.set
1004+
.multi
1005+
.wait(&mut [], timeout)
1006+
.with_context(|| "failed to wait on curl `Multi`")?;
1007+
}
9961008
}
9971009
}
9981010

1011+
fn add_sleepers(&mut self) -> CargoResult<()> {
1012+
for (dl, handle) in self.sleeping.to_retry() {
1013+
self.pending_ids.insert(dl.id);
1014+
self.enqueue(dl, handle)?;
1015+
}
1016+
Ok(())
1017+
}
1018+
9991019
fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
10001020
let dl = &self.pending[&token].0;
10011021
dl.total.set(total);
@@ -1061,7 +1081,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
10611081
return Ok(());
10621082
}
10631083
}
1064-
let pending = self.pending.len();
1084+
let pending = self.remaining();
10651085
let mut msg = if pending == 1 {
10661086
format!("{} crate", pending)
10671087
} else {

src/cargo/sources/registry/http_remote.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use crate::sources::registry::download;
88
use crate::sources::registry::MaybeLock;
99
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
1010
use crate::util::errors::{CargoResult, HttpNotSuccessful};
11-
use crate::util::network::retry::Retry;
11+
use crate::util::network::retry::{Retry, RetryResult};
12+
use crate::util::network::sleep::SleepTracker;
1213
use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
1314
use anyhow::Context;
1415
use cargo_util::paths;
15-
use curl::easy::{HttpVersion, List};
16+
use curl::easy::{Easy, HttpVersion, List};
1617
use curl::multi::{EasyHandle, Multi};
1718
use log::{debug, trace, warn};
1819
use std::cell::RefCell;
@@ -103,6 +104,8 @@ struct Downloads<'cfg> {
103104
/// Set of paths currently being downloaded.
104105
/// This should stay in sync with `pending`.
105106
pending_paths: HashSet<PathBuf>,
107+
/// Downloads that have failed and are waiting to retry again later.
108+
sleeping: SleepTracker<(Download<'cfg>, Easy)>,
106109
/// The final result of each download.
107110
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
108111
/// The next ID to use for creating a token (see `Download::token`).
@@ -184,6 +187,7 @@ impl<'cfg> HttpRegistry<'cfg> {
184187
next: 0,
185188
pending: HashMap::new(),
186189
pending_paths: HashSet::new(),
190+
sleeping: SleepTracker::new(),
187191
results: HashMap::new(),
188192
progress: RefCell::new(Some(Progress::with_style(
189193
"Fetch",
@@ -265,6 +269,7 @@ impl<'cfg> HttpRegistry<'cfg> {
265269
};
266270
for (token, result) in results {
267271
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
272+
assert!(self.downloads.pending_paths.remove(&download.path));
268273
let mut handle = self.multi.remove(handle)?;
269274
let data = download.data.take();
270275
let url = self.full_url(&download.path);
@@ -289,21 +294,19 @@ impl<'cfg> HttpRegistry<'cfg> {
289294
};
290295
Ok((data, code))
291296
}) {
292-
Ok(Some((data, code))) => Ok(CompletedDownload {
297+
RetryResult::Success((data, code)) => Ok(CompletedDownload {
293298
response_code: code,
294299
data,
295300
header_map: download.header_map.take(),
296301
}),
297-
Ok(None) => {
298-
// retry the operation
299-
let handle = self.multi.add(handle)?;
300-
self.downloads.pending.insert(token, (download, handle));
302+
RetryResult::Err(e) => Err(e),
303+
RetryResult::Retry(sleep) => {
304+
debug!("download retry {:?} for {sleep}ms", download.path);
305+
self.downloads.sleeping.push(sleep, (download, handle));
301306
continue;
302307
}
303-
Err(e) => Err(e),
304308
};
305309

306-
assert!(self.downloads.pending_paths.remove(&download.path));
307310
self.downloads.results.insert(download.path, result);
308311
self.downloads.downloads_finished += 1;
309312
}
@@ -395,6 +398,25 @@ impl<'cfg> HttpRegistry<'cfg> {
395398
))),
396399
}
397400
}
401+
402+
fn add_sleepers(&mut self) -> CargoResult<()> {
403+
for (dl, handle) in self.downloads.sleeping.to_retry() {
404+
let mut handle = self.multi.add(handle)?;
405+
handle.set_token(dl.token)?;
406+
assert!(
407+
self.downloads.pending_paths.insert(dl.path.to_path_buf()),
408+
"path queued for download more than once"
409+
);
410+
assert!(
411+
self.downloads
412+
.pending
413+
.insert(dl.token, (dl, handle))
414+
.is_none(),
415+
"dl token queued more than once"
416+
);
417+
}
418+
Ok(())
419+
}
398420
}
399421

400422
impl<'cfg> RegistryData for HttpRegistry<'cfg> {
@@ -730,6 +752,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
730752

731753
loop {
732754
self.handle_completed_downloads()?;
755+
self.add_sleepers()?;
733756

734757
let remaining_in_multi = tls::set(&self.downloads, || {
735758
self.multi
@@ -738,19 +761,25 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
738761
})?;
739762
trace!("{} transfers remaining", remaining_in_multi);
740763

741-
if remaining_in_multi == 0 {
764+
if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 {
742765
return Ok(());
743766
}
744767

745-
// We have no more replies to provide the caller with,
746-
// so we need to wait until cURL has something new for us.
747-
let timeout = self
748-
.multi
749-
.get_timeout()?
750-
.unwrap_or_else(|| Duration::new(1, 0));
751-
self.multi
752-
.wait(&mut [], timeout)
753-
.with_context(|| "failed to wait on curl `Multi`")?;
768+
if self.downloads.pending.is_empty() {
769+
let delay = self.downloads.sleeping.time_to_next().unwrap();
770+
debug!("sleeping main thread for {delay:?}");
771+
std::thread::sleep(delay);
772+
} else {
773+
// We have no more replies to provide the caller with,
774+
// so we need to wait until cURL has something new for us.
775+
let timeout = self
776+
.multi
777+
.get_timeout()?
778+
.unwrap_or_else(|| Duration::new(1, 0));
779+
self.multi
780+
.wait(&mut [], timeout)
781+
.with_context(|| "failed to wait on curl `Multi`")?;
782+
}
754783
}
755784
}
756785
}
@@ -779,7 +808,7 @@ impl<'cfg> Downloads<'cfg> {
779808
&format!(
780809
" {} complete; {} pending",
781810
self.downloads_finished,
782-
self.pending.len()
811+
self.pending.len() + self.sleeping.len()
783812
),
784813
)
785814
}

src/cargo/util/network/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::task::Poll;
44

55
pub mod retry;
6+
pub mod sleep;
67

78
pub trait PollExt<T> {
89
fn expect(self, msg: &str) -> T;

0 commit comments

Comments
 (0)