Skip to content

Commit 458138b

Browse files
alexcrichtonehuss
authored andcommitted
Replace std::sync::mpsc with a much simpler queue
We don't need the complexity of most channels since this is not a performance sensitive part of Cargo, nor is it likely to be so any time soon. Coupled with recent bugs (#7840) we believe in `std::sync::mpsc`, let's just not use that and use a custom queue type locally which should be amenable to a blocking push soon too.
1 parent 43aafb4 commit 458138b

File tree

4 files changed

+91
-38
lines changed

4 files changed

+91
-38
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ atty = "0.2"
2323
bytesize = "1.0"
2424
cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" }
2525
crates-io = { path = "crates/crates-io", version = "0.31" }
26-
crossbeam-channel = "0.4"
2726
crossbeam-utils = "0.7"
2827
crypto-hash = "0.3.1"
2928
curl = { version = "0.4.23", features = ["http2"] }

src/cargo/core/compiler/job_queue.rs

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use std::sync::Arc;
5858
use std::time::Duration;
5959

6060
use anyhow::format_err;
61-
use crossbeam_channel::{unbounded, Receiver, Sender};
6261
use crossbeam_utils::thread::Scope;
6362
use jobserver::{Acquired, Client, HelperThread};
6463
use log::{debug, info, trace};
@@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
7372
use crate::core::{PackageId, TargetKind};
7473
use crate::util;
7574
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
75+
use crate::util::Queue;
7676
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
7777
use crate::util::{Config, DependencyQueue};
7878
use crate::util::{Progress, ProgressStyle};
@@ -98,8 +98,7 @@ struct DrainState<'a, 'cfg> {
9898
total_units: usize,
9999

100100
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
101-
tx: Sender<Message>,
102-
rx: Receiver<Message>,
101+
messages: Arc<Queue<Message>>,
103102
active: HashMap<JobId, Unit<'a>>,
104103
compiled: HashSet<PackageId>,
105104
documented: HashSet<PackageId>,
@@ -145,7 +144,7 @@ impl std::fmt::Display for JobId {
145144

146145
pub struct JobState<'a> {
147146
/// Channel back to the main thread to coordinate messages and such.
148-
tx: Sender<Message>,
147+
messages: Arc<Queue<Message>>,
149148

150149
/// The job id that this state is associated with, used when sending
151150
/// messages back to the main thread.
@@ -199,7 +198,7 @@ enum Message {
199198

200199
impl<'a> JobState<'a> {
201200
pub fn running(&self, cmd: &ProcessBuilder) {
202-
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
201+
self.messages.push(Message::Run(self.id, cmd.to_string()));
203202
}
204203

205204
pub fn build_plan(
@@ -208,17 +207,16 @@ impl<'a> JobState<'a> {
208207
cmd: ProcessBuilder,
209208
filenames: Arc<Vec<OutputFile>>,
210209
) {
211-
let _ = self
212-
.tx
213-
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
210+
self.messages
211+
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
214212
}
215213

216214
pub fn stdout(&self, stdout: String) {
217-
drop(self.tx.send(Message::Stdout(stdout)));
215+
self.messages.push(Message::Stdout(stdout));
218216
}
219217

220218
pub fn stderr(&self, stderr: String) {
221-
drop(self.tx.send(Message::Stderr(stderr)));
219+
self.messages.push(Message::Stderr(stderr));
222220
}
223221

224222
/// A method used to signal to the coordinator thread that the rmeta file
@@ -228,9 +226,8 @@ impl<'a> JobState<'a> {
228226
/// produced once!
229227
pub fn rmeta_produced(&self) {
230228
self.rmeta_required.set(false);
231-
let _ = self
232-
.tx
233-
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
229+
self.messages
230+
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
234231
}
235232

236233
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
@@ -239,14 +236,14 @@ impl<'a> JobState<'a> {
239236
/// This should arrange for the associated client to eventually get a token via
240237
/// `client.release_raw()`.
241238
pub fn will_acquire(&self) {
242-
let _ = self.tx.send(Message::NeedsToken(self.id));
239+
self.messages.push(Message::NeedsToken(self.id));
243240
}
244241

245242
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
246243
///
247244
/// Note that it does *not* write that token back anywhere.
248245
pub fn release_token(&self) {
249-
let _ = self.tx.send(Message::ReleaseToken(self.id));
246+
self.messages.push(Message::ReleaseToken(self.id));
250247
}
251248
}
252249

@@ -340,21 +337,18 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
340337
let _p = profile::start("executing the job graph");
341338
self.queue.queue_finished();
342339

343-
let (tx, rx) = unbounded();
344340
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
345341
let state = DrainState {
346342
total_units: self.queue.len(),
347343
queue: self.queue,
348-
tx,
349-
rx,
344+
messages: Arc::new(Queue::new()),
350345
active: HashMap::new(),
351346
compiled: HashSet::new(),
352347
documented: HashSet::new(),
353348
counts: self.counts,
354349
progress,
355350
next_id: 0,
356351
timings: self.timings,
357-
358352
tokens: Vec::new(),
359353
rustc_tokens: HashMap::new(),
360354
to_send_clients: BTreeMap::new(),
@@ -364,25 +358,25 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
364358
};
365359

366360
// Create a helper thread for acquiring jobserver tokens
367-
let tx = state.tx.clone();
361+
let messages = state.messages.clone();
368362
let helper = cx
369363
.jobserver
370364
.clone()
371365
.into_helper_thread(move |token| {
372-
drop(tx.send(Message::Token(token)));
366+
drop(messages.push(Message::Token(token)));
373367
})
374368
.chain_err(|| "failed to create helper thread for jobserver management")?;
375369

376370
// Create a helper thread to manage the diagnostics for rustfix if
377371
// necessary.
378-
let tx = state.tx.clone();
372+
let messages = state.messages.clone();
379373
let _diagnostic_server = cx
380374
.bcx
381375
.build_config
382376
.rustfix_diagnostic_server
383377
.borrow_mut()
384378
.take()
385-
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
379+
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));
386380

387381
crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
388382
.expect("child threads shouldn't panic")
@@ -584,7 +578,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
584578
// to run above to calculate CPU usage over time. To do this we
585579
// listen for a message with a timeout, and on timeout we run the
586580
// previous parts of the loop again.
587-
let events: Vec<_> = self.rx.try_iter().collect();
581+
let mut events = Vec::new();
582+
while let Some(event) = self.messages.try_pop() {
583+
events.push(event);
584+
}
588585
info!(
589586
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
590587
self.tokens.len(),
@@ -602,14 +599,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
602599
loop {
603600
self.tick_progress();
604601
self.tokens.truncate(self.active.len() - 1);
605-
match self.rx.recv_timeout(Duration::from_millis(500)) {
606-
Ok(message) => break vec![message],
607-
Err(_) => continue,
602+
match self.messages.pop(Duration::from_millis(500)) {
603+
Some(message) => {
604+
events.push(message);
605+
break;
606+
}
607+
None => continue,
608608
}
609609
}
610-
} else {
611-
events
612610
}
611+
return events;
613612
}
614613

615614
fn drain_the_queue(
@@ -756,7 +755,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
756755
assert!(self.active.insert(id, *unit).is_none());
757756
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
758757

759-
let my_tx = self.tx.clone();
758+
let messages = self.messages.clone();
760759
let fresh = job.freshness();
761760
let rmeta_required = cx.rmeta_required(unit);
762761

@@ -768,13 +767,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
768767
let doit = move || {
769768
let state = JobState {
770769
id,
771-
tx: my_tx.clone(),
770+
messages: messages.clone(),
772771
rmeta_required: Cell::new(rmeta_required),
773772
_marker: marker::PhantomData,
774773
};
775774

776775
let mut sender = FinishOnDrop {
777-
tx: &my_tx,
776+
messages: &messages,
778777
id,
779778
result: Err(format_err!("worker panicked")),
780779
};
@@ -793,25 +792,24 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
793792
// we need to make sure that the metadata is flagged as produced so
794793
// send a synthetic message here.
795794
if state.rmeta_required.get() && sender.result.is_ok() {
796-
my_tx
797-
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
798-
.unwrap();
795+
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
799796
}
800797

801798
// Use a helper struct with a `Drop` implementation to guarantee
802799
// that a `Finish` message is sent even if our job panics. We
803800
// shouldn't panic unless there's a bug in Cargo, so we just need
804801
// to make sure nothing hangs by accident.
805802
struct FinishOnDrop<'a> {
806-
tx: &'a Sender<Message>,
803+
messages: &'a Queue<Message>,
807804
id: JobId,
808805
result: CargoResult<()>,
809806
}
810807

811808
impl Drop for FinishOnDrop<'_> {
812809
fn drop(&mut self) {
813810
let msg = mem::replace(&mut self.result, Ok(()));
814-
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
811+
self.messages
812+
.push(Message::Finish(self.id, Artifact::All, msg));
815813
}
816814
}
817815
};

src/cargo/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
1818
pub use self::paths::{dylib_path_envvar, normalize_path};
1919
pub use self::process_builder::{process, ProcessBuilder};
2020
pub use self::progress::{Progress, ProgressStyle};
21+
pub use self::queue::Queue;
2122
pub use self::read2::read2;
2223
pub use self::restricted_names::validate_package_name;
2324
pub use self::rustc::Rustc;
@@ -51,6 +52,7 @@ pub mod paths;
5152
pub mod process_builder;
5253
pub mod profile;
5354
mod progress;
55+
mod queue;
5456
mod read2;
5557
pub mod restricted_names;
5658
pub mod rustc;

src/cargo/util/queue.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::collections::VecDeque;
2+
use std::sync::{Condvar, Mutex};
3+
use std::time::{Duration, Instant};
4+
5+
/// A simple, threadsafe, queue of items of type `T`
6+
///
7+
/// This is a sort of channel where any thread can push to a queue and any
8+
/// thread can pop from a queue. Currently queues have infinite capacity where
9+
/// `push` will never block but `pop` will block.
10+
pub struct Queue<T> {
11+
state: Mutex<State<T>>,
12+
condvar: Condvar,
13+
}
14+
15+
struct State<T> {
16+
items: VecDeque<T>,
17+
}
18+
19+
impl<T> Queue<T> {
20+
pub fn new() -> Queue<T> {
21+
Queue {
22+
state: Mutex::new(State {
23+
items: VecDeque::new(),
24+
}),
25+
condvar: Condvar::new(),
26+
}
27+
}
28+
29+
pub fn push(&self, item: T) {
30+
self.state.lock().unwrap().items.push_back(item);
31+
self.condvar.notify_one();
32+
}
33+
34+
pub fn pop(&self, timeout: Duration) -> Option<T> {
35+
let mut state = self.state.lock().unwrap();
36+
let now = Instant::now();
37+
while state.items.is_empty() {
38+
let elapsed = now.elapsed();
39+
if elapsed >= timeout {
40+
break;
41+
}
42+
let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
43+
state = lock;
44+
if result.timed_out() {
45+
break;
46+
}
47+
}
48+
state.items.pop_front()
49+
}
50+
51+
pub fn try_pop(&self) -> Option<T> {
52+
self.state.lock().unwrap().items.pop_front()
53+
}
54+
}

0 commit comments

Comments
 (0)