Skip to content

Commit e239cc8

Browse files
authored
[installinator] speed up HTTP downloads (#9900)
* Set the connect timeout to 5s per peer. * Remember the last peer used to fetch an artifact. Fixes #9617.
1 parent d315c91 commit e239cc8

File tree

5 files changed

+154
-17
lines changed

5 files changed

+154
-17
lines changed

installinator/src/artifact.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,23 @@ impl ArtifactClient {
154154
slog::o!("component" => "ArtifactClient", "peer" => addr.to_string()),
155155
);
156156

157-
// Set a connect timeout of 15 seconds (the progenitor default), and a
158-
// total timeout of 5 minutes. The progenitor default for the total
159-
// timeout is 15 seconds, which can easily be exceeded for large
160-
// downloads. (Don't set the total timeout to be too long, though,
161-
// because we're fetching ~2GiB artifacts over a LAN which really should
162-
// take less than 5 minutes.)
157+
// Set a connect timeout of 5 seconds and a total timeout of 5 minutes.
158+
// The progenitor default for the total timeout is 15 seconds, which can
159+
// easily be exceeded for large downloads. (Don't set the total timeout
160+
// to be too long, though, because we're fetching ~2GiB artifacts over a
161+
// LAN which really should take less than 5 minutes.)
162+
//
163+
// The connect timeout is deliberately short: these are LAN connections
164+
// on the bootstrap network, and peers are tried serially. A long
165+
// connect timeout (the progenitor default of 15s) means unresponsive
166+
// peers block progress for a long time before the next peer is tried.
163167
//
164168
// Do not set a read timeout here -- instead, read timeouts are handled
165169
// by the fetch loop. (Why is the read timeout handled by the fetch
166170
// loop? So that it can also apply to the mock peer backend, and logic
167171
// shared across both.)
168172
let client = reqwest::ClientBuilder::new()
169-
.connect_timeout(Duration::from_secs(15))
173+
.connect_timeout(Duration::from_secs(5))
170174
.timeout(Duration::from_secs(5 * 60))
171175
.build()
172176
.expect("installinator artifact client created");

installinator/src/dispatch.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
ArtifactWriter, MeasurementToWrite, WriteDestination,
2828
artifact::{ArtifactIdOpts, ArtifactsToDownload, MeasurementArtifact},
2929
fetch::{FetchArtifactBackend, FetchedArtifact, HttpFetchBackend},
30-
peers::DiscoveryMechanism,
30+
peers::{DiscoveryMechanism, LastKnownPeer},
3131
reporter::{HttpProgressBackend, ProgressReporter, ReportProgressBackend},
3232
};
3333

@@ -99,6 +99,7 @@ impl DebugDiscoverOpts {
9999
self.opts.mechanism.discover_peers(&log).await?,
100100
)),
101101
Duration::from_secs(10),
102+
None,
102103
);
103104
println!("discovered peers: {}", backend.peers().display());
104105
Ok(())
@@ -206,6 +207,8 @@ impl InstallOpts {
206207
);
207208
let progress_handle = progress_reporter.start();
208209
let discovery = &self.discover_opts.mechanism;
210+
let last_known_peer = LastKnownPeer::new();
211+
let last_known_peer = &last_known_peer;
209212

210213
let engine = UpdateEngine::new(log, event_sender);
211214

@@ -223,6 +226,7 @@ impl InstallOpts {
223226
&cx,
224227
&installinator_doc_id,
225228
discovery,
229+
&last_known_peer,
226230
log,
227231
)
228232
.await?;
@@ -326,9 +330,14 @@ impl InstallOpts {
326330
let to_download = to_download.into_value(cx.token()).await;
327331
let host_phase_2_id = to_download.host_phase_2_id();
328332

329-
let host_phase_2_artifact =
330-
fetch_artifact(&cx, &host_phase_2_id, discovery, log)
331-
.await?;
333+
let host_phase_2_artifact = fetch_artifact(
334+
&cx,
335+
&host_phase_2_id,
336+
discovery,
337+
last_known_peer,
338+
log,
339+
)
340+
.await?;
332341

333342
// Check that the sha256 of the data we got from wicket
334343
// matches the data we asked for. We do not retry this for
@@ -364,9 +373,14 @@ impl InstallOpts {
364373
to_download_2.into_value(cx.token()).await;
365374
let control_plane_id = to_download.control_plane_id();
366375

367-
let control_plane_artifact =
368-
fetch_artifact(&cx, &control_plane_id, discovery, log)
369-
.await?;
376+
let control_plane_artifact = fetch_artifact(
377+
&cx,
378+
&control_plane_id,
379+
discovery,
380+
last_known_peer,
381+
log,
382+
)
383+
.await?;
370384

371385
// Check that the sha256 of the data we got from wicket
372386
// matches the data we asked for. We do not retry this for
@@ -420,7 +434,7 @@ impl InstallOpts {
420434
async move |cx2| {
421435
let measurement_artifact =
422436
fetch_artifact(
423-
&cx2, &c.hash, discovery, log,
437+
&cx2, &c.hash, discovery, last_known_peer, log,
424438
)
425439
.await?;
426440

@@ -675,6 +689,7 @@ async fn fetch_artifact(
675689
cx: &StepContext,
676690
id: &ArtifactHashId,
677691
discovery: &DiscoveryMechanism,
692+
last_known_peer: &LastKnownPeer,
678693
log: &slog::Logger,
679694
) -> Result<FetchedArtifact> {
680695
// TODO: Not sure why slog::o!("artifact" => ?id) isn't working, figure it
@@ -684,20 +699,24 @@ async fn fetch_artifact(
684699
cx,
685700
&log,
686701
|| async {
702+
let preferred = last_known_peer.get();
687703
Ok(FetchArtifactBackend::new(
688704
&log,
689705
Box::new(HttpFetchBackend::new(
690706
&log,
691707
discovery.discover_peers(&log).await?,
692708
)),
693709
Duration::from_secs(10),
710+
preferred,
694711
))
695712
},
696713
id,
697714
)
698715
.await
699716
.with_context(|| format!("error fetching image with id {id:?}"))?;
700717

718+
last_known_peer.set(artifact.peer);
719+
701720
slog::info!(
702721
log,
703722
"fetched {} bytes from {}",

installinator/src/fetch.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,18 @@ pub(crate) struct FetchArtifactBackend {
127127
log: slog::Logger,
128128
imp: Box<dyn FetchArtifactImpl>,
129129
read_timeout: Duration,
130+
preferred_peer: Option<PeerAddress>,
130131
}
131132

132133
impl FetchArtifactBackend {
133134
pub(crate) fn new(
134135
log: &slog::Logger,
135136
imp: Box<dyn FetchArtifactImpl>,
136137
read_timeout: Duration,
138+
preferred_peer: Option<PeerAddress>,
137139
) -> Self {
138140
let log = log.new(slog::o!("component" => "Peers"));
139-
Self { log, imp, read_timeout }
141+
Self { log, imp, read_timeout, preferred_peer }
140142
}
141143

142144
pub(crate) async fn fetch_artifact(
@@ -154,7 +156,7 @@ impl FetchArtifactBackend {
154156

155157
slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);
156158

157-
for &peer in peers.peers() {
159+
for &peer in peers.iter_with_preferred(self.preferred_peer) {
158160
remaining_peers -= 1;
159161

160162
slog::debug!(

installinator/src/mock_peers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,7 @@ mod tests {
837837
&log,
838838
Box::new(peers),
839839
read_timeout,
840+
None,
840841
)),
841842
Some(Err(error)) => {
842843
future::err(DiscoverPeersError::Retry(error))

installinator/src/peers.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
fmt,
88
net::{AddrParseError, IpAddr, SocketAddr},
99
str::FromStr,
10+
sync::Mutex,
1011
};
1112

1213
use anyhow::{Result, bail};
@@ -118,6 +119,21 @@ impl PeerAddresses {
118119
pub(crate) fn display(&self) -> impl fmt::Display + use<> {
119120
self.peers().iter().join(", ")
120121
}
122+
123+
/// Returns an iterator that yields the preferred peer first (if present in
124+
/// the set), then the remaining peers in BTreeSet order.
125+
pub(crate) fn iter_with_preferred(
126+
&self,
127+
preferred: Option<PeerAddress>,
128+
) -> impl Iterator<Item = &PeerAddress> {
129+
// Look up the preferred peer in the set to get a reference with the
130+
// right lifetime. (If it's not in the set, the correct behavior is to
131+
// skip it.)
132+
let preferred_ref = preferred.and_then(|p| self.peers.get(&p));
133+
preferred_ref
134+
.into_iter()
135+
.chain(self.peers.iter().filter(move |&p| Some(p) != preferred_ref))
136+
}
121137
}
122138

123139
impl FromIterator<PeerAddress> for PeerAddresses {
@@ -157,3 +173,98 @@ impl FromStr for PeerAddress {
157173
Ok(Self { address })
158174
}
159175
}
176+
177+
/// Tracks the last peer that successfully delivered an artifact.
178+
///
179+
/// Created once per install session and shared across all artifact fetches, so
180+
/// that a peer discovered while fetching an earlier artifact is tried first for
181+
/// subsequent artifacts.
182+
#[derive(Debug)]
183+
pub(crate) struct LastKnownPeer {
184+
inner: Mutex<Option<PeerAddress>>,
185+
}
186+
187+
impl LastKnownPeer {
188+
pub(crate) fn new() -> Self {
189+
Self { inner: Mutex::new(None) }
190+
}
191+
192+
pub(crate) fn get(&self) -> Option<PeerAddress> {
193+
*self.inner.lock().expect("last known peer lock poisoned")
194+
}
195+
196+
pub(crate) fn set(&self, peer: PeerAddress) {
197+
*self.inner.lock().expect("last known peer lock poisoned") = Some(peer);
198+
}
199+
}
200+
201+
#[cfg(test)]
202+
mod tests {
203+
use super::*;
204+
use std::net::{Ipv6Addr, SocketAddrV6};
205+
206+
/// Build a peer address from a distinguishing segment. Returns
207+
/// `[::n]:8000`.
208+
fn peer(last_segment: u16) -> PeerAddress {
209+
PeerAddress::new(SocketAddr::V6(SocketAddrV6::new(
210+
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, last_segment),
211+
8000,
212+
0,
213+
0,
214+
)))
215+
}
216+
217+
fn addrs(ns: &[u16]) -> PeerAddresses {
218+
ns.iter().map(|&n| peer(n)).collect()
219+
}
220+
221+
/// Collect the last segment of each peer address for easy comparison.
222+
fn collect_last_segment(
223+
peers: &PeerAddresses,
224+
preferred: Option<PeerAddress>,
225+
) -> Vec<u16> {
226+
peers
227+
.iter_with_preferred(preferred)
228+
.map(|p| match p.address() {
229+
SocketAddr::V6(a) => a.ip().segments()[7],
230+
SocketAddr::V4(_) => {
231+
unreachable!("we only use IPv6 addresses in these tests")
232+
}
233+
})
234+
.collect()
235+
}
236+
237+
#[test]
238+
fn iter_no_preferred_preserves_btree_order() {
239+
let peers = addrs(&[3, 1, 2]);
240+
assert_eq!(collect_last_segment(&peers, None), vec![1, 2, 3]);
241+
}
242+
243+
#[test]
244+
fn last_known_peer_preferred() {
245+
let peers = addrs(&[3, 1, 2]);
246+
let lkp = LastKnownPeer::new();
247+
248+
// Initially no preferred peer: normal BTreeSet order.
249+
assert_eq!(collect_last_segment(&peers, lkp.get()), vec![1, 2, 3]);
250+
251+
// Simulate a successful fetch from peer ::3.
252+
lkp.set(peer(3));
253+
assert_eq!(collect_last_segment(&peers, lkp.get()), vec![3, 1, 2]);
254+
255+
// Subsequent fetch succeeds from peer ::1 instead.
256+
lkp.set(peer(1));
257+
assert_eq!(collect_last_segment(&peers, lkp.get()), vec![1, 2, 3]);
258+
}
259+
260+
#[test]
261+
fn last_known_peer_stale() {
262+
// The remembered peer may not appear in a later discovery round.
263+
let lkp = LastKnownPeer::new();
264+
lkp.set(peer(1));
265+
266+
let new_peers = addrs(&[2, 3]);
267+
// peer(1) is gone; iteration should fall back to normal order.
268+
assert_eq!(collect_last_segment(&new_peers, lkp.get()), vec![2, 3]);
269+
}
270+
}

0 commit comments

Comments
 (0)