Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.

Commit 0834e04

Browse files
committed
Rudimentary profiler.
- `Circuit::map_node_recursive` - apply a function to all nodes in a circuit. - `Profiler` - ties together several existing components into a rudimentary profiler: - `TraceMonitor` - to capture circuit topology and dump it in graphviz format. - `map_node_recursive` - to extract node metadata from the circuit - `CPUProfiler` - to measure CPU usage - Modify `DBSPHandle` to create a `Profiler` isntance on startup. `DBSPHandle::enable_cpu_profiler` API to enable CPU profiling in all worker threads. `DBSPHandler::dump_profile` API to extract profiles from all worker threads and store them in a directory. - Add `--profile-path` switch to the Nexmark benchmark to enable profiling and dump profile after running each query.
1 parent ac61285 commit 0834e04

File tree

8 files changed

+273
-30
lines changed

8 files changed

+273
-30
lines changed

benches/nexmark/main.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use mimalloc::{AllocStats, MiMalloc};
2828
use num_format::{Locale, ToFormattedString};
2929
use size_of::HumanBytes;
3030
use std::{
31+
path::Path,
3132
sync::mpsc,
3233
thread::{self, JoinHandle},
3334
time::{Duration, Instant},
@@ -56,17 +57,29 @@ enum StepCompleted {
5657
}
5758

5859
fn spawn_dbsp_consumer(
60+
query: &str,
61+
profile_path: Option<&str>,
5962
mut dbsp: DBSPHandle,
6063
step_do_rx: mpsc::Receiver<()>,
6164
step_done_tx: mpsc::SyncSender<StepCompleted>,
6265
) -> JoinHandle<()> {
66+
let query = query.to_string();
67+
let profile_path = profile_path.map(ToString::to_string);
68+
6369
thread::Builder::new()
6470
.name("benchmark_consumer".into())
6571
.spawn(move || {
72+
if profile_path.is_some() {
73+
dbsp.enable_cpu_profiler().unwrap();
74+
}
6675
while let Ok(()) = step_do_rx.recv() {
6776
dbsp.step().unwrap();
6877
step_done_tx.send(StepCompleted::Dbsp).unwrap();
6978
}
79+
if let Some(profile_path) = profile_path {
80+
dbsp.dump_profile(<String as AsRef<Path>>::as_ref(&profile_path).join(query))
81+
.unwrap();
82+
}
7083
})
7184
.unwrap()
7285
}

benches/nexmark/run_queries.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ macro_rules! run_queries {
1515
// so. The DBSP processing happens in its own thread where the resource usage
1616
// calculation can also happen.
1717
let (dbsp_step_tx, dbsp_step_rx) = mpsc::sync_channel(1);
18-
let dbsp_join_handle = spawn_dbsp_consumer(dbsp, dbsp_step_rx, step_done_tx.clone());
18+
let dbsp_join_handle = spawn_dbsp_consumer(stringify!($query), $nexmark_config.profile_path.as_deref(), dbsp, dbsp_step_rx, step_done_tx.clone());
1919

2020
// Start the generator inputting the specified number of batches to the circuit
2121
// whenever it receives a message.

src/circuit/circuit_builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ pub trait Node {
325325
fn metadata(&self, output: &mut OperatorMeta);
326326

327327
fn fixedpoint(&self, scope: Scope) -> bool;
328+
329+
fn map_nodes_recursive(&self, _f: &mut dyn FnMut(&dyn Node)) {}
328330
}
329331

330332
/// Id of an operator, guaranteed to be unique within a circuit.
@@ -957,6 +959,14 @@ impl<P> Circuit<P> {
957959
.collect()
958960
}
959961

962+
/// Recursively apply `f` to all nodes in `self` and its children.
963+
pub(crate) fn map_nodes_recursive(&self, f: &mut dyn FnMut(&dyn Node)) {
964+
for node in self.inner().nodes.iter() {
965+
f(node.as_ref());
966+
node.map_nodes_recursive(f);
967+
}
968+
}
969+
960970
/// Deliver `clock_start` notification to all nodes in the circuit.
961971
pub(super) fn clock_start(&self, scope: Scope) {
962972
for node in self.inner_mut().nodes.iter_mut() {
@@ -3065,6 +3075,10 @@ where
30653075
fn fixedpoint(&self, scope: Scope) -> bool {
30663076
self.circuit.inner().fixedpoint(scope + 1)
30673077
}
3078+
3079+
fn map_nodes_recursive(&self, f: &mut dyn FnMut(&dyn Node)) {
3080+
self.circuit.map_nodes_recursive(f);
3081+
}
30683082
}
30693083

30703084
/// Top-level circuit with executor.

src/circuit/dbsp_handle.rs

Lines changed: 120 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::{
2-
circuit::runtime::RuntimeHandle, Circuit, Error as DBSPError, Runtime, RuntimeError,
3-
SchedulerError,
2+
circuit::runtime::RuntimeHandle, profile::Profiler, Circuit, Error as DBSPError, Runtime,
3+
RuntimeError, SchedulerError,
44
};
55
use crossbeam::channel::{bounded, Receiver, Sender, TryRecvError};
6-
use std::thread::Result as ThreadResult;
6+
use std::{fs, fs::create_dir_all, path::Path, thread::Result as ThreadResult, time::Instant};
77

88
impl Runtime {
99
/// Instantiate a circuit in a multithreaded runtime.
@@ -55,12 +55,16 @@ impl Runtime {
5555
let status_sender = status_senders.into_iter().nth(worker_index).unwrap();
5656
let command_receiver = command_receivers.into_iter().nth(worker_index).unwrap();
5757

58-
let circuit = match Circuit::build(constructor) {
59-
Ok((circuit, res)) => {
58+
let (circuit, profiler) = match Circuit::build(|circuit| {
59+
let profiler = Profiler::new(circuit);
60+
let res = constructor(circuit);
61+
(res, profiler)
62+
}) {
63+
Ok((circuit, (res, profiler))) => {
6064
if init_sender.send(Ok(res)).is_err() {
6165
return;
6266
}
63-
circuit
67+
(circuit, profiler)
6468
}
6569
Err(e) => {
6670
let _ = init_sender.send(Err(e));
@@ -74,14 +78,29 @@ impl Runtime {
7478
while !Runtime::kill_in_progress() {
7579
// Wait for command.
7680
match command_receiver.try_recv() {
77-
Ok(()) => {
81+
Ok(Command::Step) => {
7882
//moregc = true;
79-
let status = circuit.step();
83+
let status = circuit.step().map(|_| Response::Unit);
8084
// Send response.
8185
if status_sender.send(status).is_err() {
8286
return;
8387
};
8488
}
89+
Ok(Command::EnableProfiler) => {
90+
profiler.enable_cpu_profiler();
91+
// Send response.
92+
if status_sender.send(Ok(Response::Unit)).is_err() {
93+
return;
94+
}
95+
}
96+
Ok(Command::DumpProfile) => {
97+
if status_sender
98+
.send(Ok(Response::Profile(profiler.dump_profile())))
99+
.is_err()
100+
{
101+
return;
102+
}
103+
}
85104
// Nothing to do: do some housekeeping and relinquish the CPU if there's none
86105
// left.
87106
Err(TryRecvError::Empty) => {
@@ -116,9 +135,13 @@ impl Runtime {
116135
}
117136

118137
// On error, kill the runtime.
119-
if let Some(error) = init_status.iter().find_map(|status| status.as_ref().err()) {
138+
if init_status.iter().any(Result::is_err) {
139+
let error = init_status
140+
.into_iter()
141+
.find_map(|status| status.err())
142+
.unwrap();
120143
let _ = runtime.kill();
121-
return Err(error.clone());
144+
return Err(error);
122145
}
123146

124147
let dbsp = DBSPHandle::new(runtime, command_senders, status_receivers);
@@ -129,25 +152,39 @@ impl Runtime {
129152
}
130153
}
131154

155+
#[derive(Clone)]
156+
enum Command {
157+
Step,
158+
EnableProfiler,
159+
DumpProfile,
160+
}
161+
162+
enum Response {
163+
Unit,
164+
Profile(String),
165+
}
166+
132167
/// A handle to control the execution of a circuit in a multithreaded runtime.
133168
#[derive(Debug)]
134169
pub struct DBSPHandle {
170+
// Time when the handle was created.
171+
start_time: Instant,
135172
runtime: Option<RuntimeHandle>,
136-
// Channels used to send commands to workers. Currently the only supported
137-
// command is 'step', so we can use `()` to represent commands.
138-
command_senders: Vec<Sender<()>>,
173+
// Channels used to send commands to workers.
174+
command_senders: Vec<Sender<Command>>,
139175
// Channels used to receive command completion status from
140176
// workers.
141-
status_receivers: Vec<Receiver<Result<(), SchedulerError>>>,
177+
status_receivers: Vec<Receiver<Result<Response, SchedulerError>>>,
142178
}
143179

144180
impl DBSPHandle {
145181
fn new(
146182
runtime: RuntimeHandle,
147-
command_senders: Vec<Sender<()>>,
148-
status_receivers: Vec<Receiver<Result<(), SchedulerError>>>,
183+
command_senders: Vec<Sender<Command>>,
184+
status_receivers: Vec<Receiver<Result<Response, SchedulerError>>>,
149185
) -> Self {
150186
Self {
187+
start_time: Instant::now(),
151188
runtime: Some(runtime),
152189
command_senders,
153190
status_receivers,
@@ -160,15 +197,17 @@ impl DBSPHandle {
160197
self.runtime.take().unwrap().kill()
161198
}
162199

163-
/// Evaluate the circuit for one clock cycle.
164-
pub fn step(&mut self) -> Result<(), DBSPError> {
200+
fn broadcast_command<F>(&mut self, command: Command, mut handler: F) -> Result<(), DBSPError>
201+
where
202+
F: FnMut(Response),
203+
{
165204
if self.runtime.is_none() {
166205
return Err(DBSPError::Runtime(RuntimeError::Killed));
167206
}
168207

169208
// Send command.
170209
for (worker, sender) in self.command_senders.iter().enumerate() {
171-
if matches!(sender.send(()), Err(_)) {
210+
if matches!(sender.send(command.clone()), Err(_)) {
172211
let _ = self.kill_inner();
173212
return Err(DBSPError::Runtime(RuntimeError::WorkerPanic(worker)));
174213
}
@@ -186,8 +225,55 @@ impl DBSPHandle {
186225
let _ = self.kill_inner();
187226
return Err(DBSPError::Scheduler(e));
188227
}
189-
Ok(Ok(_)) => {}
228+
Ok(Ok(resp)) => handler(resp),
229+
}
230+
}
231+
232+
Ok(())
233+
}
234+
235+
pub fn num_workers(&self) -> usize {
236+
self.status_receivers.len()
237+
}
238+
239+
/// Evaluate the circuit for one clock cycle.
240+
pub fn step(&mut self) -> Result<(), DBSPError> {
241+
self.broadcast_command(Command::Step, |_| {})
242+
}
243+
244+
/// Enable CPU profiler.
245+
///
246+
/// Enable recording of CPU usage info. When CPU profiling is enabled,
247+
/// [`Self::dump_profile`] outputs CPU usage info along with memory
248+
/// usage and other circuit metadata. CPU profiling introduces small
249+
/// runtime overhead.
250+
pub fn enable_cpu_profiler(&mut self) -> Result<(), DBSPError> {
251+
self.broadcast_command(Command::EnableProfiler, |_| {})
252+
}
253+
254+
/// Dump profiling information to the specified directory.
255+
///
256+
/// Creates `dir_path` if it doesn't exist. For each worker thread, creates
257+
/// `dir_path/<timestamp>/<worker>.dot` file containing worker profile in
258+
/// the graphviz format. If CPU profiling was enabled (see
259+
/// [`Self::enable_cpu_profiler`]), the profile will contain both CPU and
260+
/// memory usage information; otherwise only memory usage details are
261+
/// reported.
262+
pub fn dump_profile<P: AsRef<Path>>(&mut self, dir_path: P) -> Result<(), DBSPError> {
263+
let elapsed = self.start_time.elapsed().as_micros();
264+
let mut profiles = Vec::with_capacity(self.num_workers());
265+
266+
let dir_path = dir_path.as_ref().join(elapsed.to_string());
267+
create_dir_all(&dir_path)?;
268+
269+
self.broadcast_command(Command::DumpProfile, |resp| {
270+
if let Response::Profile(prof) = resp {
271+
profiles.push(prof);
190272
}
273+
})?;
274+
275+
for (worker, profile) in profiles.into_iter().enumerate() {
276+
fs::write(dir_path.join(format!("{worker}.dot")), profile)?;
191277
}
192278

193279
Ok(())
@@ -242,10 +328,11 @@ mod tests {
242328
circuit.add_source(Generator::new(|| 5usize));
243329
});
244330

245-
assert_eq!(
246-
res.unwrap_err(),
247-
DBSPError::Runtime(RuntimeError::WorkerPanic(0))
248-
);
331+
if let DBSPError::Runtime(err) = res.unwrap_err() {
332+
assert_eq!(err, RuntimeError::WorkerPanic(0));
333+
} else {
334+
panic!();
335+
}
249336
}
250337

251338
// TODO: initialization error in worker thread (the `constructor` closure
@@ -275,10 +362,11 @@ mod tests {
275362
})
276363
.unwrap();
277364

278-
assert_eq!(
279-
handle.step().unwrap_err(),
280-
DBSPError::Runtime(RuntimeError::WorkerPanic(0))
281-
);
365+
if let DBSPError::Runtime(err) = handle.step().unwrap_err() {
366+
assert_eq!(err, RuntimeError::WorkerPanic(0));
367+
} else {
368+
panic!();
369+
}
282370
}
283371

284372
// Kill the runtime.
@@ -298,7 +386,11 @@ mod tests {
298386
})
299387
.unwrap();
300388

389+
handle.enable_cpu_profiler().unwrap();
301390
handle.step().unwrap();
391+
handle
392+
.dump_profile(std::env::temp_dir().join("test_kill"))
393+
.unwrap();
302394
handle.kill().unwrap();
303395
}
304396

src/error.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,34 @@
11
use crate::{RuntimeError, SchedulerError};
2+
use std::io::Error as IOError;
23

3-
#[derive(Clone, Debug, Eq, PartialEq)]
4+
#[derive(Debug)]
45
pub enum Error {
56
Scheduler(SchedulerError),
67
Runtime(RuntimeError),
8+
IO(IOError),
79
Custom(String),
810
}
11+
12+
impl From<IOError> for Error {
13+
fn from(error: IOError) -> Self {
14+
Self::IO(error)
15+
}
16+
}
17+
18+
impl From<SchedulerError> for Error {
19+
fn from(error: SchedulerError) -> Self {
20+
Self::Scheduler(error)
21+
}
22+
}
23+
24+
impl From<RuntimeError> for Error {
25+
fn from(error: RuntimeError) -> Self {
26+
Self::Runtime(error)
27+
}
28+
}
29+
30+
impl From<String> for Error {
31+
fn from(error: String) -> Self {
32+
Self::Custom(error)
33+
}
34+
}

src/nexmark/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ pub struct Config {
9191
#[clap(long, default_value = "1", env = "NEXMARK_PERSON_PROPORTION")]
9292
pub person_proportion: usize,
9393

94+
/// Dump DBSP profiles for all executed queries to the specified directory.
95+
#[clap(long, env = "NEXMARK_PROFILE_PATH")]
96+
pub profile_path: Option<String>,
97+
9498
/// Queries to run, all by default.
9599
#[clap(long, env = "NEXMARK_QUERIES", value_enum)]
96100
pub query: Vec<Query>,
@@ -133,6 +137,7 @@ impl Default for Config {
133137
num_in_flight_auctions: 100,
134138
out_of_order_group_size: 1,
135139
person_proportion: 1,
140+
profile_path: None,
136141
query: Vec::new(),
137142
source_buffer_size: 10_000,
138143
input_batch_size: 40_000,

src/nexmark/queries/q16.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,9 +793,11 @@ mod tests {
793793
})
794794
.unwrap();
795795

796+
dbsp.enable_cpu_profiler().unwrap();
796797
for mut vec in input_vecs {
797798
input_handle.append(&mut vec);
798799
dbsp.step().unwrap();
799800
}
801+
dbsp.dump_profile(std::env::temp_dir().join("q16")).unwrap();
800802
}
801803
}

0 commit comments

Comments
 (0)