Skip to content

Commit 11f31ae

Browse files
bors[bot]matklad
andauthored
Merge #5114
5114: Cleanup cargo process handling in flycheck r=matklad a=matklad bors r+ 🤖 Co-authored-by: Aleksey Kladov <[email protected]>
2 parents f3cd82c + 5cdd8d4 commit 11f31ae

File tree

3 files changed

+128
-94
lines changed

3 files changed

+128
-94
lines changed

crates/flycheck/src/lib.rs

Lines changed: 119 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
use std::{
66
fmt,
77
io::{self, BufReader},
8+
ops,
89
path::PathBuf,
9-
process::{Command, Stdio},
10+
process::{self, Command, Stdio},
1011
time::Duration,
1112
};
1213

@@ -49,8 +50,8 @@ impl fmt::Display for FlycheckConfig {
4950
#[derive(Debug)]
5051
pub struct FlycheckHandle {
5152
// XXX: drop order is significant
52-
cmd_send: Sender<Restart>,
53-
handle: jod_thread::JoinHandle,
53+
sender: Sender<Restart>,
54+
thread: jod_thread::JoinHandle,
5455
}
5556

5657
impl FlycheckHandle {
@@ -59,16 +60,15 @@ impl FlycheckHandle {
5960
config: FlycheckConfig,
6061
workspace_root: PathBuf,
6162
) -> FlycheckHandle {
62-
let (cmd_send, cmd_recv) = unbounded::<Restart>();
63-
let handle = jod_thread::spawn(move || {
64-
FlycheckActor::new(sender, config, workspace_root).run(cmd_recv);
65-
});
66-
FlycheckHandle { cmd_send, handle }
63+
let actor = FlycheckActor::new(sender, config, workspace_root);
64+
let (sender, receiver) = unbounded::<Restart>();
65+
let thread = jod_thread::spawn(move || actor.run(receiver));
66+
FlycheckHandle { sender, thread }
6767
}
6868

6969
/// Schedule a re-start of the cargo check worker.
7070
pub fn update(&self) {
71-
self.cmd_send.send(Restart).unwrap();
71+
self.sender.send(Restart).unwrap();
7272
}
7373
}
7474

@@ -85,7 +85,7 @@ pub enum Message {
8585
pub enum Progress {
8686
DidStart,
8787
DidCheckCrate(String),
88-
DidFinish,
88+
DidFinish(io::Result<()>),
8989
DidCancel,
9090
}
9191

@@ -100,8 +100,7 @@ struct FlycheckActor {
100100
/// doesn't provide a way to read sub-process output without blocking, so we
101101
/// have to wrap sub-processes output handling in a thread and pass messages
102102
/// back over a channel.
103-
// XXX: drop order is significant
104-
check_process: Option<(Receiver<cargo_metadata::Message>, jod_thread::JoinHandle)>,
103+
cargo_handle: Option<CargoHandle>,
105104
}
106105

107106
enum Event {
@@ -115,29 +114,36 @@ impl FlycheckActor {
115114
config: FlycheckConfig,
116115
workspace_root: PathBuf,
117116
) -> FlycheckActor {
118-
FlycheckActor { sender, config, workspace_root, check_process: None }
117+
FlycheckActor { sender, config, workspace_root, cargo_handle: None }
119118
}
120119
fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
121-
let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
120+
let check_chan = self.cargo_handle.as_ref().map(|cargo| &cargo.receiver);
122121
select! {
123122
recv(inbox) -> msg => msg.ok().map(Event::Restart),
124123
recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
125124
}
126125
}
127-
fn run(&mut self, inbox: Receiver<Restart>) {
126+
fn run(mut self, inbox: Receiver<Restart>) {
128127
while let Some(event) = self.next_event(&inbox) {
129128
match event {
130129
Event::Restart(Restart) => {
131130
while let Ok(Restart) = inbox.recv_timeout(Duration::from_millis(50)) {}
131+
132132
self.cancel_check_process();
133-
self.check_process = Some(self.start_check_process());
134-
self.send(Message::Progress(Progress::DidStart));
133+
134+
let mut command = self.check_command();
135+
command.stdout(Stdio::piped()).stderr(Stdio::null()).stdin(Stdio::null());
136+
if let Ok(child) = command.spawn().map(JodChild) {
137+
self.cargo_handle = Some(CargoHandle::spawn(child));
138+
self.send(Message::Progress(Progress::DidStart));
139+
}
135140
}
136141
Event::CheckEvent(None) => {
137142
// Watcher finished, replace it with a never channel to
138143
// avoid busy-waiting.
139-
assert!(self.check_process.take().is_some());
140-
self.send(Message::Progress(Progress::DidFinish));
144+
let cargo_handle = self.cargo_handle.take().unwrap();
145+
let res = cargo_handle.join();
146+
self.send(Message::Progress(Progress::DidFinish(res)));
141147
}
142148
Event::CheckEvent(Some(message)) => match message {
143149
cargo_metadata::Message::CompilerArtifact(msg) => {
@@ -162,11 +168,11 @@ impl FlycheckActor {
162168
self.cancel_check_process();
163169
}
164170
fn cancel_check_process(&mut self) {
165-
if self.check_process.take().is_some() {
171+
if self.cargo_handle.take().is_some() {
166172
self.send(Message::Progress(Progress::DidCancel));
167173
}
168174
}
169-
fn start_check_process(&self) -> (Receiver<cargo_metadata::Message>, jod_thread::JoinHandle) {
175+
fn check_command(&self) -> Command {
170176
let mut cmd = match &self.config {
171177
FlycheckConfig::CargoCommand {
172178
command,
@@ -198,88 +204,112 @@ impl FlycheckActor {
198204
}
199205
};
200206
cmd.current_dir(&self.workspace_root);
201-
202-
let (message_send, message_recv) = unbounded();
203-
let thread = jod_thread::spawn(move || {
204-
// If we trigger an error here, we will do so in the loop instead,
205-
// which will break out of the loop, and continue the shutdown
206-
let res = run_cargo(cmd, &mut |message| {
207-
// Skip certain kinds of messages to only spend time on what's useful
208-
match &message {
209-
cargo_metadata::Message::CompilerArtifact(artifact) if artifact.fresh => {
210-
return true
211-
}
212-
cargo_metadata::Message::BuildScriptExecuted(_)
213-
| cargo_metadata::Message::Unknown => return true,
214-
_ => {}
215-
}
216-
217-
// if the send channel was closed, we want to shutdown
218-
message_send.send(message).is_ok()
219-
});
220-
221-
if let Err(err) = res {
222-
// FIXME: make the `message_send` to be `Sender<Result<CheckEvent, CargoError>>`
223-
// to display user-caused misconfiguration errors instead of just logging them here
224-
log::error!("Cargo watcher failed {:?}", err);
225-
}
226-
});
227-
(message_recv, thread)
207+
cmd
228208
}
229209

230210
fn send(&self, check_task: Message) {
231211
(self.sender)(check_task)
232212
}
233213
}
234214

235-
fn run_cargo(
236-
mut command: Command,
237-
on_message: &mut dyn FnMut(cargo_metadata::Message) -> bool,
238-
) -> io::Result<()> {
239-
let mut child =
240-
command.stdout(Stdio::piped()).stderr(Stdio::null()).stdin(Stdio::null()).spawn()?;
215+
struct CargoHandle {
216+
child: JodChild,
217+
#[allow(unused)]
218+
thread: jod_thread::JoinHandle<io::Result<bool>>,
219+
receiver: Receiver<cargo_metadata::Message>,
220+
}
241221

242-
// We manually read a line at a time, instead of using serde's
243-
// stream deserializers, because the deserializer cannot recover
244-
// from an error, resulting in it getting stuck, because we try to
245-
// be resillient against failures.
246-
//
247-
// Because cargo only outputs one JSON object per line, we can
248-
// simply skip a line if it doesn't parse, which just ignores any
249-
// erroneus output.
250-
let stdout = BufReader::new(child.stdout.take().unwrap());
251-
let mut read_at_least_one_message = false;
252-
for message in cargo_metadata::Message::parse_stream(stdout) {
253-
let message = match message {
254-
Ok(message) => message,
255-
Err(err) => {
256-
log::error!("Invalid json from cargo check, ignoring ({})", err);
257-
continue;
258-
}
259-
};
222+
impl CargoHandle {
223+
fn spawn(mut child: JodChild) -> CargoHandle {
224+
let child_stdout = child.stdout.take().unwrap();
225+
let (sender, receiver) = unbounded();
226+
let actor = CargoActor::new(child_stdout, sender);
227+
let thread = jod_thread::spawn(move || actor.run());
228+
CargoHandle { child, thread, receiver }
229+
}
230+
fn join(mut self) -> io::Result<()> {
231+
// It is okay to ignore the result, as it only errors if the process is already dead
232+
let _ = self.child.kill();
233+
let exit_status = self.child.wait()?;
234+
let read_at_least_one_message = self.thread.join()?;
235+
if !exit_status.success() && !read_at_least_one_message {
236+
// FIXME: Read the stderr to display the reason, see `read2()` reference in PR comment:
237+
// https://github.com/rust-analyzer/rust-analyzer/pull/3632#discussion_r395605298
238+
return Err(io::Error::new(
239+
io::ErrorKind::Other,
240+
format!(
241+
"Cargo watcher failed,the command produced no valid metadata (exit code: {:?})",
242+
exit_status
243+
),
244+
));
245+
}
246+
Ok(())
247+
}
248+
}
249+
250+
struct CargoActor {
251+
child_stdout: process::ChildStdout,
252+
sender: Sender<cargo_metadata::Message>,
253+
}
254+
255+
impl CargoActor {
256+
fn new(
257+
child_stdout: process::ChildStdout,
258+
sender: Sender<cargo_metadata::Message>,
259+
) -> CargoActor {
260+
CargoActor { child_stdout, sender }
261+
}
262+
fn run(self) -> io::Result<bool> {
263+
// We manually read a line at a time, instead of using serde's
264+
// stream deserializers, because the deserializer cannot recover
265+
// from an error, resulting in it getting stuck, because we try to
266+
// be resilient against failures.
267+
//
268+
// Because cargo only outputs one JSON object per line, we can
269+
// simply skip a line if it doesn't parse, which just ignores any
270+
// erroneus output.
271+
let stdout = BufReader::new(self.child_stdout);
272+
let mut read_at_least_one_message = false;
273+
for message in cargo_metadata::Message::parse_stream(stdout) {
274+
let message = match message {
275+
Ok(message) => message,
276+
Err(err) => {
277+
log::error!("Invalid json from cargo check, ignoring ({})", err);
278+
continue;
279+
}
280+
};
260281

261-
read_at_least_one_message = true;
282+
read_at_least_one_message = true;
262283

263-
if !on_message(message) {
264-
break;
284+
// Skip certain kinds of messages to only spend time on what's useful
285+
match &message {
286+
cargo_metadata::Message::CompilerArtifact(artifact) if artifact.fresh => (),
287+
cargo_metadata::Message::BuildScriptExecuted(_)
288+
| cargo_metadata::Message::Unknown => (),
289+
_ => self.sender.send(message).unwrap(),
290+
}
265291
}
292+
Ok(read_at_least_one_message)
266293
}
294+
}
267295

268-
// It is okay to ignore the result, as it only errors if the process is already dead
269-
let _ = child.kill();
296+
struct JodChild(process::Child);
270297

271-
let exit_status = child.wait()?;
272-
if !exit_status.success() && !read_at_least_one_message {
273-
// FIXME: Read the stderr to display the reason, see `read2()` reference in PR comment:
274-
// https://github.com/rust-analyzer/rust-analyzer/pull/3632#discussion_r395605298
275-
return Err(io::Error::new(
276-
io::ErrorKind::Other,
277-
format!(
278-
"the command produced no valid metadata (exit code: {:?}): {:?}",
279-
exit_status, command
280-
),
281-
));
298+
impl ops::Deref for JodChild {
299+
type Target = process::Child;
300+
fn deref(&self) -> &process::Child {
301+
&self.0
282302
}
303+
}
283304

284-
Ok(())
305+
impl ops::DerefMut for JodChild {
306+
fn deref_mut(&mut self) -> &mut process::Child {
307+
&mut self.0
308+
}
309+
}
310+
311+
impl Drop for JodChild {
312+
fn drop(&mut self) {
313+
let _ = self.0.kill();
314+
}
285315
}

crates/rust-analyzer/src/main_loop.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,11 @@ impl GlobalState {
216216
flycheck::Progress::DidCheckCrate(target) => {
217217
(Progress::Report, Some(target))
218218
}
219-
flycheck::Progress::DidFinish | flycheck::Progress::DidCancel => {
219+
flycheck::Progress::DidCancel => (Progress::End, None),
220+
flycheck::Progress::DidFinish(result) => {
221+
if let Err(err) = result {
222+
log::error!("cargo check failed: {}", err)
223+
}
220224
(Progress::End, None)
221225
}
222226
};

crates/vfs-notify/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ mod include;
1010

1111
use std::convert::{TryFrom, TryInto};
1212

13-
use crossbeam_channel::{select, unbounded, Receiver};
13+
use crossbeam_channel::{select, unbounded, Receiver, Sender};
1414
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
1515
use paths::{AbsPath, AbsPathBuf};
1616
use rustc_hash::FxHashSet;
@@ -22,8 +22,8 @@ use crate::include::Include;
2222
#[derive(Debug)]
2323
pub struct NotifyHandle {
2424
// Relative order of fields below is significant.
25-
sender: crossbeam_channel::Sender<Message>,
26-
_thread: jod_thread::JoinHandle,
25+
sender: Sender<Message>,
26+
thread: jod_thread::JoinHandle,
2727
}
2828

2929
#[derive(Debug)]
@@ -37,7 +37,7 @@ impl loader::Handle for NotifyHandle {
3737
let actor = NotifyActor::new(sender);
3838
let (sender, receiver) = unbounded::<Message>();
3939
let thread = jod_thread::spawn(move || actor.run(receiver));
40-
NotifyHandle { sender, _thread: thread }
40+
NotifyHandle { sender, thread }
4141
}
4242
fn set_config(&mut self, config: loader::Config) {
4343
self.sender.send(Message::Config(config)).unwrap()

0 commit comments

Comments
 (0)