Skip to content

Commit ed30cac

Browse files
aidanhsluser
authored andcommitted
Compress inputs and outputs over the wire
1 parent 25684b7 commit ed30cac

File tree

6 files changed

+80
-33
lines changed

6 files changed

+80
-33
lines changed

src/bin/sccache-dist/build.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ use libmount::Overlay;
1818
use lru_disk_cache::Error as LruError;
1919
use nix;
2020
use sccache::dist::{
21-
BuildResult, CompileCommand, InputsReader, TcCache, Toolchain,
21+
BuildResult, CompileCommand, InputsReader, OutputData, TcCache, Toolchain,
2222
BuilderIncoming,
2323
};
2424
use std::collections::HashMap;
2525
use std::fs;
26-
use std::io::{self, Read};
26+
use std::io;
2727
use std::iter;
2828
use std::path::{self, Path, PathBuf};
2929
use std::process::{Command, Output, Stdio};
@@ -208,8 +208,7 @@ impl OverlayBuilder {
208208
let abspath = join_suffix(&target_dir, cwd.join(&path)); // Resolve in case it's relative since we copy it from the root level
209209
match fs::File::open(abspath) {
210210
Ok(mut file) => {
211-
let mut output = vec![];
212-
file.read_to_end(&mut output).unwrap();
211+
let output = OutputData::from_reader(file);
213212
outputs.push((path, output))
214213
},
215214
Err(e) => {
@@ -508,7 +507,7 @@ impl DockerBuilder {
508507
// TODO: this isn't great, but cp gives it out as a tar
509508
let output = Command::new("docker").args(&["exec", cid, "/busybox", "cat"]).arg(abspath).output().unwrap();
510509
if output.status.success() {
511-
outputs.push((path, output.stdout))
510+
outputs.push((path, OutputData::from_reader(&*output.stdout)))
512511
} else {
513512
debug!("Missing output path {:?}", path)
514513
}

src/compiler/c.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ impl<I: CCompilerImpl> Compilation for CCompilation<I> {
298298
}
299299

300300
#[cfg(feature = "dist-client")]
301-
fn into_dist_inputs_creator(self: Box<Self>, path_transformer: &mut dist::PathTransformer) -> Result<(Box<FnMut(&mut io::Write)>, Box<pkg::CompilerPackager>)> {
301+
fn into_dist_inputs_creator(self: Box<Self>, path_transformer: &mut dist::PathTransformer) -> Result<(Box<FnMut(&mut io::Write) + Send>, Box<pkg::CompilerPackager>)> {
302302
let CCompilation { parsed_args, cwd, preprocessed_input, executable, .. } = *{self};
303303
trace!("Dist inputs: {:?}", parsed_args.input);
304304

src/compiler/compiler.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ fn dist_or_local_compile<T>(dist_client: Arc<dist::Client>,
338338
where T: CommandCreatorSync {
339339
use boxfnonce::BoxFnOnce;
340340
use futures::future;
341+
use std::io;
341342

342343
debug!("[{}]: Attempting distributed compilation", out_pretty);
343344
let compile_out_pretty = out_pretty.clone();
@@ -399,9 +400,12 @@ fn dist_or_local_compile<T>(dist_client: Arc<dist::Client>,
399400
dist::RunJobResult::Complete(jc) => jc,
400401
dist::RunJobResult::JobNotFound => panic!(),
401402
};
402-
info!("fetched {:?}", jc.outputs.iter().map(|&(ref p, ref bs)| (p, bs.len())).collect::<Vec<_>>());
403-
for (path, bytes) in jc.outputs {
404-
File::create(path_transformer.to_local(&path)).unwrap().write_all(&bytes).unwrap();
403+
info!("fetched {:?}", jc.outputs.iter().map(|&(ref p, ref bs)| (p, bs.lens().to_string())).collect::<Vec<_>>());
404+
for (path, output_data) in jc.outputs {
405+
let len = output_data.lens().actual;
406+
let mut file = File::create(path_transformer.to_local(&path)).unwrap();
407+
let count = io::copy(&mut output_data.into_reader(), &mut file).unwrap();
408+
assert!(count == len);
405409
}
406410
jc.output.into()
407411
})
@@ -432,7 +436,7 @@ pub trait Compilation {
432436
// TODO: It's more correct to have a FnBox or Box<FnOnce> here
433437
#[cfg(feature = "dist-client")]
434438
fn into_dist_inputs_creator(self: Box<Self>, _path_transformer: &mut dist::PathTransformer)
435-
-> Result<(Box<FnMut(&mut Write)>, Box<CompilerPackager>)> {
439+
-> Result<(Box<FnMut(&mut Write) + Send>, Box<CompilerPackager>)> {
436440

437441
bail!("distributed compilation not implemented")
438442
}

src/compiler/rust.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,7 @@ impl Compilation for RustCompilation {
10111011
}
10121012

10131013
#[cfg(feature = "dist-client")]
1014-
fn into_dist_inputs_creator(self: Box<Self>, path_transformer: &mut dist::PathTransformer) -> Result<(Box<FnMut(&mut io::Write)>, Box<pkg::CompilerPackager>)> {
1014+
fn into_dist_inputs_creator(self: Box<Self>, path_transformer: &mut dist::PathTransformer) -> Result<(Box<FnMut(&mut io::Write) + Send>, Box<pkg::CompilerPackager>)> {
10151015
let RustCompilation { inputs, crate_link_paths, sysroot, crate_types, .. } = *{self};
10161016
trace!("Dist inputs: inputs={:?} crate_link_paths={:?}", inputs, crate_link_paths);
10171017

src/dist/http.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ mod common {
141141
mod server {
142142
use bincode;
143143
use byteorder::{BigEndian, ReadBytesExt};
144+
use flate2::read::ZlibDecoder as ZlibReadDecoder;
144145
use jwt;
145146
use num_cpus;
146147
use rand::{self, RngCore};
@@ -526,7 +527,7 @@ mod server {
526527
trace!("Req {}: run_job({}): {:?}", req_id, job_id, runjob);
527528
let RunJobHttpRequest { command, outputs } = runjob;
528529
let body = bincode_reader.into_inner();
529-
let inputs_rdr = InputsReader(Box::new(body));
530+
let inputs_rdr = InputsReader(Box::new(ZlibReadDecoder::new(body)));
530531
let outputs = outputs.into_iter().collect();
531532

532533
let res: RunJobResult = try_or_500_log!(req_id, handler.handle_run_job(&requester, job_id, command, outputs, inputs_rdr));
@@ -566,7 +567,9 @@ mod client {
566567
use boxfnonce::BoxFnOnce;
567568
use byteorder::{BigEndian, WriteBytesExt};
568569
use config;
569-
use futures::Future;
570+
use flate2::Compression;
571+
use flate2::write::ZlibEncoder as ZlibWriteEncoder;
572+
use futures::{Future, Stream};
570573
use futures_cpupool::CpuPool;
571574
use reqwest;
572575
use std::fs;
@@ -633,29 +636,37 @@ mod client {
633636
if let Some(toolchain_file) = self.tc_cache.get_toolchain(&tc) {
634637
let url = format!("http://{}/api/v1/distserver/submit_toolchain/{}", job_alloc.server_id.addr(), job_alloc.job_id);
635638
let mut req = self.client.post(&url);
636-
req.bearer_auth(job_alloc.auth.clone()).body(toolchain_file);
637639

638-
Box::new(self.pool.spawn_fn(move || bincode_req(&mut req)))
640+
Box::new(self.pool.spawn_fn(move || {
641+
req.bearer_auth(job_alloc.auth.clone()).body(toolchain_file);
642+
bincode_req(&mut req)
643+
}))
639644
} else {
640645
f_err("couldn't find toolchain locally")
641646
}
642647
}
643-
fn do_run_job(&self, job_alloc: JobAlloc, command: CompileCommand, outputs: Vec<String>, mut write_inputs: Box<FnMut(&mut Write)>) -> SFuture<RunJobResult> {
648+
fn do_run_job(&self, job_alloc: JobAlloc, command: CompileCommand, outputs: Vec<String>, mut write_inputs: Box<FnMut(&mut Write) + Send>) -> SFuture<RunJobResult> {
644649
let url = format!("http://{}/api/v1/distserver/run_job/{}", job_alloc.server_id.addr(), job_alloc.job_id);
645-
let bincode = bincode::serialize(&RunJobHttpRequest { command, outputs }).unwrap();
646-
let bincode_length = bincode.len();
647-
let mut inputs = vec![];
648-
write_inputs(&mut inputs);
650+
let mut req = self.client.post(&url);
649651

650-
let mut body = vec![];
651-
body.write_u32::<BigEndian>(bincode_length as u32).unwrap();
652-
body.write(&bincode).unwrap();
653-
body.write(&inputs).unwrap();
652+
Box::new(self.pool.spawn_fn(move || {
653+
let bincode = bincode::serialize(&RunJobHttpRequest { command, outputs }).unwrap();
654+
let bincode_length = bincode.len();
655+
656+
let mut body = vec![];
657+
body.write_u32::<BigEndian>(bincode_length as u32).unwrap();
658+
body.write(&bincode).unwrap();
659+
{
660+
let mut compressor = ZlibWriteEncoder::new(&mut body, Compression::fast());
661+
write_inputs(&mut compressor);
662+
compressor.flush().unwrap();
663+
trace!("Compressed inputs from {} -> {}", compressor.total_in(), compressor.total_out());
664+
compressor.finish().unwrap();
665+
}
654666

655-
let mut req = self.client.post(&url);
656-
req.bearer_auth(job_alloc.auth.clone()).bytes(body);
657-
trace!("Writing {} bytes of input archive", inputs.len());
658-
Box::new(self.pool.spawn_fn(move || bincode_req(&mut req)))
667+
req.bearer_auth(job_alloc.auth.clone()).bytes(body);
668+
bincode_req(&mut req)
669+
}))
659670
}
660671

661672
fn put_toolchain(&self, compiler_path: &Path, weak_key: &str, create: BoxFnOnce<(fs::File,), Result<()>>) -> Result<(Toolchain, Option<String>)> {

src/dist/mod.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use boxfnonce::BoxFnOnce;
1616
use compiler;
1717
use std::fmt;
1818
use std::fs;
19-
use std::io::{self, Read, Write};
19+
use std::io::{self, Cursor, Read, Write};
2020
use std::net::SocketAddr;
2121
use std::ffi::OsString;
2222
use std::path::Path;
@@ -271,6 +271,39 @@ impl From<ProcessOutput> for process::Output {
271271
}
272272
}
273273

274+
#[derive(Clone, Serialize, Deserialize)]
275+
#[serde(deny_unknown_fields)]
276+
pub struct OutputData(Vec<u8>, u64);
277+
impl OutputData {
278+
#[cfg(feature = "dist-server")]
279+
pub fn from_reader<R: Read>(r: R) -> Self {
280+
use flate2::Compression;
281+
use flate2::read::ZlibEncoder as ZlibReadEncoder;
282+
let mut compressor = ZlibReadEncoder::new(r, Compression::fast());
283+
let mut res = vec![];
284+
io::copy(&mut compressor, &mut res).unwrap();
285+
OutputData(res, compressor.total_in())
286+
}
287+
pub fn lens(&self) -> OutputDataLens {
288+
OutputDataLens { actual: self.1, compressed: self.0.len() as u64 }
289+
}
290+
#[cfg(feature = "dist-client")]
291+
pub fn into_reader(self) -> impl Read {
292+
use flate2::read::ZlibDecoder as ZlibReadDecoder;
293+
let decompressor = ZlibReadDecoder::new(Cursor::new(self.0));
294+
decompressor
295+
}
296+
}
297+
pub struct OutputDataLens {
298+
pub actual: u64,
299+
pub compressed: u64,
300+
}
301+
impl fmt::Display for OutputDataLens {
302+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
303+
write!(f, "Size: {}->{}", self.actual, self.compressed)
304+
}
305+
}
306+
274307
// TODO: standardise on compressed or not for inputs and toolchain
275308

276309
// TODO: make fields not public
@@ -328,7 +361,7 @@ pub enum RunJobResult {
328361
#[serde(deny_unknown_fields)]
329362
pub struct JobComplete {
330363
pub output: ProcessOutput,
331-
pub outputs: Vec<(String, Vec<u8>)>,
364+
pub outputs: Vec<(String, OutputData)>,
332365
}
333366

334367
// Status
@@ -355,7 +388,7 @@ pub enum SubmitToolchainResult {
355388

356389
pub struct BuildResult {
357390
pub output: ProcessOutput,
358-
pub outputs: Vec<(String, Vec<u8>)>,
391+
pub outputs: Vec<(String, OutputData)>,
359392
}
360393

361394
///////////////////
@@ -431,7 +464,7 @@ pub trait Client {
431464
// To Server
432465
// TODO: ideally Box<FnOnce or FnBox
433466
// BoxFnOnce library doesn't work due to incorrect lifetime inference - https://github.com/rust-lang/rust/issues/28796#issuecomment-410071058
434-
fn do_run_job(&self, job_alloc: JobAlloc, command: CompileCommand, outputs: Vec<String>, write_inputs: Box<FnMut(&mut Write)>) -> SFuture<RunJobResult>;
467+
fn do_run_job(&self, job_alloc: JobAlloc, command: CompileCommand, outputs: Vec<String>, write_inputs: Box<FnMut(&mut Write) + Send>) -> SFuture<RunJobResult>;
435468
fn put_toolchain(&self, compiler_path: &Path, weak_key: &str, create: BoxFnOnce<(fs::File,), Result<()>>) -> Result<(Toolchain, Option<String>)>;
436469
fn may_dist(&self) -> bool;
437470
}
@@ -447,7 +480,7 @@ impl Client for NoopClient {
447480
fn do_submit_toolchain(&self, _job_alloc: JobAlloc, _tc: Toolchain) -> SFuture<SubmitToolchainResult> {
448481
panic!("NoopClient");
449482
}
450-
fn do_run_job(&self, _job_alloc: JobAlloc, _command: CompileCommand, _outputs: Vec<String>, _write_inputs: Box<FnMut(&mut Write)>) -> SFuture<RunJobResult> {
483+
fn do_run_job(&self, _job_alloc: JobAlloc, _command: CompileCommand, _outputs: Vec<String>, _write_inputs: Box<FnMut(&mut Write) + Send>) -> SFuture<RunJobResult> {
451484
panic!("NoopClient");
452485
}
453486

0 commit comments

Comments
 (0)