Skip to content

Commit fafa9fc

Browse files
authored
Refactor: Extract new fn run_commands_in_parallel (#1549)
Simplify `compile_objects` function and prepare for parallel is_flags_supported checks
1 parent c8a378e commit fafa9fc

File tree

5 files changed

+191
-181
lines changed

5 files changed

+191
-181
lines changed

src/command_helpers.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl StderrForwarder {
119119
}
120120
}
121121

122-
fn forward_available(&mut self) -> bool {
122+
pub(crate) fn forward_available(&mut self) -> bool {
123123
if let Some((stderr, buffer)) = self.inner.as_mut() {
124124
loop {
125125
// For non-blocking we check to see if there is data available, so we should try to
@@ -226,7 +226,7 @@ impl StderrForwarder {
226226
}
227227

228228
#[cfg(feature = "parallel")]
229-
fn forward_all(&mut self) {
229+
pub(crate) fn forward_all(&mut self) {
230230
while !self.forward_available() {}
231231
}
232232

@@ -424,38 +424,3 @@ pub(crate) fn command_add_output_file(cmd: &mut Command, dst: &Path, args: CmdAd
424424
cmd.arg("-o").arg(dst);
425425
}
426426
}
427-
428-
#[cfg(feature = "parallel")]
429-
pub(crate) fn try_wait_on_child(
430-
cmd: &Command,
431-
child: &mut Child,
432-
stdout: &mut dyn io::Write,
433-
stderr_forwarder: &mut StderrForwarder,
434-
) -> Result<Option<()>, Error> {
435-
stderr_forwarder.forward_available();
436-
437-
match child.try_wait() {
438-
Ok(Some(status)) => {
439-
stderr_forwarder.forward_all();
440-
441-
let _ = writeln!(stdout, "{}", status);
442-
443-
if status.success() {
444-
Ok(Some(()))
445-
} else {
446-
Err(Error::new(
447-
ErrorKind::ToolExecError,
448-
format!("command did not execute successfully (status code {status}): {cmd:?}"),
449-
))
450-
}
451-
}
452-
Ok(None) => Ok(None),
453-
Err(e) => {
454-
stderr_forwarder.forward_all();
455-
Err(Error::new(
456-
ErrorKind::ToolExecError,
457-
format!("failed to wait on spawned child process `{cmd:?}`: {e}"),
458-
))
459-
}
460-
}
461-
}

src/lib.rs

Lines changed: 8 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,6 @@ use std::fmt::{self, Display};
248248
use std::fs;
249249
use std::io::{self, Write};
250250
use std::path::{Component, Path, PathBuf};
251-
#[cfg(feature = "parallel")]
252-
use std::process::Child;
253251
use std::process::{Command, Stdio};
254252
use std::sync::{
255253
atomic::{AtomicU8, Ordering::Relaxed},
@@ -260,8 +258,10 @@ use shlex::Shlex;
260258

261259
#[cfg(feature = "parallel")]
262260
mod parallel;
261+
263262
mod target;
264263
use self::target::*;
264+
265265
/// A helper module to looking for windows-specific tools:
266266
/// 1. On Windows host, probe the Windows Registry if needed;
267267
/// 2. On non-Windows host, check specified environment variables.
@@ -1755,144 +1755,17 @@ impl Build {
17551755
Ok(objects.into_iter().map(|v| v.dst).collect())
17561756
}
17571757

1758-
#[cfg(feature = "parallel")]
17591758
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
1760-
use std::cell::Cell;
1761-
1762-
use parallel::async_executor::{block_on, YieldOnce};
1763-
17641759
check_disabled()?;
17651760

1766-
if objs.len() <= 1 {
1767-
return self.compile_objects_sequential(objs);
1768-
}
1769-
1770-
// Limit our parallelism globally with a jobserver.
1771-
let mut tokens = parallel::job_token::ActiveJobTokenServer::new();
1772-
1773-
// When compiling objects in parallel we do a few dirty tricks to speed
1774-
// things up:
1775-
//
1776-
// * First is that we use the `jobserver` crate to limit the parallelism
1777-
// of this build script. The `jobserver` crate will use a jobserver
1778-
// configured by Cargo for build scripts to ensure that parallelism is
1779-
// coordinated across C compilations and Rust compilations. Before we
1780-
// compile anything we make sure to wait until we acquire a token.
1781-
//
1782-
// Note that this jobserver is cached globally so we only used one per
1783-
// process and only worry about creating it once.
1784-
//
1785-
// * Next we use spawn the process to actually compile objects in
1786-
// parallel after we've acquired a token to perform some work
1787-
//
1788-
// With all that in mind we compile all objects in a loop here, after we
1789-
// acquire the appropriate tokens, Once all objects have been compiled
1790-
// we wait on all the processes and propagate the results of compilation.
1791-
1792-
let pendings =
1793-
Cell::new(Vec::<(Command, KillOnDrop, parallel::job_token::JobToken)>::new());
1794-
let is_disconnected = Cell::new(false);
1795-
let has_made_progress = Cell::new(false);
1796-
1797-
let wait_future = async {
1798-
let mut error = None;
1799-
// Buffer the stdout
1800-
let mut stdout = io::BufWriter::with_capacity(128, io::stdout());
1801-
1802-
loop {
1803-
// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
1804-
// so it doesn't make sense to reuse the tokens; in fact,
1805-
// releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
1806-
// Imagine that the last file built takes an hour to finish; in this scenario,
1807-
// by not releasing the tokens before that last file is done we would effectively block other processes from
1808-
// starting sooner - even though we only need one token for that last file, not N others that were acquired.
1809-
1810-
let mut pendings_is_empty = false;
1811-
1812-
cell_update(&pendings, |mut pendings| {
1813-
// Try waiting on them.
1814-
pendings.retain_mut(|(cmd, child, _token)| {
1815-
match try_wait_on_child(cmd, &mut child.0, &mut stdout, &mut child.1) {
1816-
Ok(Some(())) => {
1817-
// Task done, remove the entry
1818-
has_made_progress.set(true);
1819-
false
1820-
}
1821-
Ok(None) => true, // Task still not finished, keep the entry
1822-
Err(err) => {
1823-
// Task fail, remove the entry.
1824-
// Since we can only return one error, log the error to make
1825-
// sure users always see all the compilation failures.
1826-
has_made_progress.set(true);
1827-
1828-
if self.cargo_output.warnings {
1829-
let _ = writeln!(stdout, "cargo:warning={}", err);
1830-
}
1831-
error = Some(err);
1832-
1833-
false
1834-
}
1835-
}
1836-
});
1837-
pendings_is_empty = pendings.is_empty();
1838-
pendings
1839-
});
1840-
1841-
if pendings_is_empty && is_disconnected.get() {
1842-
break if let Some(err) = error {
1843-
Err(err)
1844-
} else {
1845-
Ok(())
1846-
};
1847-
}
1848-
1849-
YieldOnce::default().await;
1850-
}
1851-
};
1852-
let spawn_future = async {
1853-
for obj in objs {
1854-
let mut cmd = self.create_compile_object_cmd(obj)?;
1855-
let token = tokens.acquire().await?;
1856-
let mut child = spawn(&mut cmd, &self.cargo_output)?;
1857-
let mut stderr_forwarder = StderrForwarder::new(&mut child);
1858-
stderr_forwarder.set_non_blocking()?;
1859-
1860-
cell_update(&pendings, |mut pendings| {
1861-
pendings.push((cmd, KillOnDrop(child, stderr_forwarder), token));
1862-
pendings
1863-
});
1864-
1865-
has_made_progress.set(true);
1866-
}
1867-
is_disconnected.set(true);
1868-
1869-
Ok::<_, Error>(())
1870-
};
1871-
1872-
return block_on(wait_future, spawn_future, &has_made_progress);
1873-
1874-
struct KillOnDrop(Child, StderrForwarder);
1875-
1876-
impl Drop for KillOnDrop {
1877-
fn drop(&mut self) {
1878-
let child = &mut self.0;
1879-
1880-
child.kill().ok();
1881-
}
1882-
}
1883-
1884-
fn cell_update<T, F>(cell: &Cell<T>, f: F)
1885-
where
1886-
T: Default,
1887-
F: FnOnce(T) -> T,
1888-
{
1889-
let old = cell.take();
1890-
let new = f(old);
1891-
cell.set(new);
1761+
#[cfg(feature = "parallel")]
1762+
if objs.len() > 1 {
1763+
return parallel::run_commands_in_parallel(
1764+
&self.cargo_output,
1765+
&mut objs.iter().map(|obj| self.create_compile_object_cmd(obj)),
1766+
);
18921767
}
1893-
}
18941768

1895-
fn compile_objects_sequential(&self, objs: &[Object]) -> Result<(), Error> {
18961769
for obj in objs {
18971770
let mut cmd = self.create_compile_object_cmd(obj)?;
18981771
run(&mut cmd, &self.cargo_output)?;
@@ -1901,13 +1774,6 @@ impl Build {
19011774
Ok(())
19021775
}
19031776

1904-
#[cfg(not(feature = "parallel"))]
1905-
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
1906-
check_disabled()?;
1907-
1908-
self.compile_objects_sequential(objs)
1909-
}
1910-
19111777
fn create_compile_object_cmd(&self, obj: &Object) -> Result<Command, Error> {
19121778
let asm_ext = AsmFileExt::from_path(&obj.src);
19131779
let is_asm = asm_ext.is_some();

0 commit comments

Comments
 (0)