From 07f2af84a785230522a548a78913a3f65925bb1b Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 6 Mar 2025 11:01:22 +0100 Subject: [PATCH 1/2] Pluggable subgraph builders Demo to show that we could plug the subgraph builder to allow custom wrappers. An example shows how it could be used to compare wall-clock time to scheduling time. Signed-off-by: Moritz Hoffmann --- timely/examples/event_driven_thread_stat.rs | 333 ++++++++++++++++++ .../src/dataflow/operators/core/enterleave.rs | 29 +- .../src/dataflow/operators/core/feedback.rs | 22 +- timely/src/dataflow/scopes/child.rs | 52 +-- timely/src/dataflow/scopes/mod.rs | 33 +- timely/src/execute.rs | 3 +- timely/src/progress/subgraph.rs | 134 ++++--- timely/src/worker.rs | 46 ++- 8 files changed, 556 insertions(+), 96 deletions(-) create mode 100644 timely/examples/event_driven_thread_stat.rs diff --git a/timely/examples/event_driven_thread_stat.rs b/timely/examples/event_driven_thread_stat.rs new file mode 100644 index 000000000..46a799a22 --- /dev/null +++ b/timely/examples/event_driven_thread_stat.rs @@ -0,0 +1,333 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; +use timely::dataflow::operators::{Input, Map, Probe}; +use timely::logging::{TimelyLogger, TimelySummaryLogger}; +use timely::progress::{Antichain, ChangeBatch, Operate, Source, SubgraphBuilder, Target, Timestamp}; +use timely::progress::operate::SharedProgress; +use timely::progress::subgraph::SubgraphBuilderT; +use timely::progress::timestamp::Refines; +use timely::scheduling::Schedule; +use timely::worker::AsWorker; + +struct ThreadStatSubgraphBuilder { + inner: SG, +} + +impl Schedule for ThreadStatSubgraphBuilder { + fn name(&self) -> &str { + self.inner.name() + } + + fn path(&self) -> &[usize] { + self.inner.path() + } + + fn schedule(&mut self) -> bool { + let start = std::time::Instant::now(); + let stats = stats::Stats::from_self(); + let done = self.inner.schedule(); + let elapsed = start.elapsed(); + if elapsed >= Duration::from_millis(10) { + let stats_after = stats::Stats::from_self(); + if let (Ok(stats), Ok(stats_after)) = (stats, stats_after) { + println!("schedule delta utime {}\tdelta stime {}\telapsed {elapsed:?}", + stats_after.utime - stats.utime, + stats_after.stime - stats.stime); + } + } + done + } +} + +impl> Operate for ThreadStatSubgraphBuilder { + fn local(&self) -> bool { + self.inner.local() + } + + fn inputs(&self) -> usize { + self.inner.inputs() + } + + fn outputs(&self) -> usize { + self.inner.outputs() + } + + fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + self.inner.get_internal_summary() + } + + fn set_external_summary(&mut self) { + self.inner.set_external_summary(); + } + + fn notify_me(&self) -> bool { + self.inner.notify_me() + } +} + +impl SubgraphBuilderT for ThreadStatSubgraphBuilder +where + TOuter: Timestamp, + TInner: Timestamp, + SG: SubgraphBuilderT, +{ + type Subgraph = ThreadStatSubgraphBuilder; + + fn new_from(path: Rc<[usize]>, identifier: usize, logging: Option, summary_logging: Option>, name: &str) -> Self { + Self { inner: SG::new_from(path, identifier, logging, summary_logging, name)} + } + + fn build(self, worker: &mut A) -> Self::Subgraph { + ThreadStatSubgraphBuilder{ inner: self.inner.build(worker) } + } + + fn name(&self) -> &str { + self.inner.name() + } + + fn path(&self) -> Rc<[usize]> { + self.inner.path() + } + + fn connect(&mut self, source: Source, target: Target) { + self.inner.connect(source, target) + } + + fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { + self.inner.add_child(child, index, identifier) + } + + fn allocate_child_id(&mut self) -> usize { + self.inner.allocate_child_id() + } + + fn new_input(&mut self, shared_counts: Rc>>) -> Target + where + TInner: Refines + { + self.inner.new_input(shared_counts) + } + + fn new_output(&mut self) -> Source + where + TInner: Refines + { + self.inner.new_output() + } +} + +pub mod stats { + use std::str::FromStr; + + /// based on https://elixir.bootlin.com/linux/v5.19.17/source/fs/proc/array.c#L567 + #[derive(Debug)] + pub struct Stats { + pub pid: usize, + pub name: String, + pub state: char, + pub ppid: isize, + pub pgid: isize, + pub psid: isize, + pub tty_nr: isize, + pub tty_grp: isize, + pub flags: usize, + pub min_flt: usize, + pub cmin_flt: usize, + pub maj_flt: usize, + pub cmaj_flt: usize, + pub utime: usize, + pub stime: usize, + pub cutime: isize, + pub cstime: isize, + pub priority: isize, + pub nice: isize, + pub num_threads: isize, + pub _zero0: usize, + pub start_time: usize, + pub vsize: usize, + pub rss: usize, + pub rsslim: usize, + pub start_code: usize, + pub end_code: usize, + pub start_stack: usize, + pub esp: usize, + pub eip: usize, + pub pending: usize, + pub blocked: usize, + pub sigign: usize, + pub sigcatch: usize, + pub wchan: usize, + pub _zero1: usize, + pub _zero2: usize, + pub exit_signal: isize, + pub task_cpu: isize, + pub rt_priority: isize, + pub policy: isize, + pub blkio_ticks: usize, + pub gtime: usize, + pub cgtime: isize, + pub start_data: usize, + pub end_data: usize, + pub start_brk: usize, + pub arg_start: usize, + pub arg_end: usize, + pub env_start: usize, + pub env_end: usize, + pub exit_code: isize, + } + + pub enum Error { + Underflow, + ParseIntError(std::num::ParseIntError), + IOError(std::io::Error), + } + + impl From> for Error { + fn from(_: Option<&str>) -> Self { + Error::Underflow + } + } + + impl From for Error { + fn from(e: std::num::ParseIntError) -> Self { + Error::ParseIntError(e) + } + } + + impl From for Error { + fn from(value: std::io::Error) -> Self { + Error::IOError(value) + } + } + + impl FromStr for Stats { + type Err = Error; + + fn from_str(s: &str) -> Result { + let mut split = s.split_whitespace(); + Ok(Self { + pid: split.next().ok_or(Error::Underflow)?.parse()?, + name: split.next().ok_or(Error::Underflow)?.to_string(), + state: split.next().ok_or(Error::Underflow)?.chars().next().ok_or(Error::Underflow)?, + ppid: split.next().ok_or(Error::Underflow)?.parse()?, + pgid: split.next().ok_or(Error::Underflow)?.parse()?, + psid: split.next().ok_or(Error::Underflow)?.parse()?, + tty_nr: split.next().ok_or(Error::Underflow)?.parse()?, + tty_grp: split.next().ok_or(Error::Underflow)?.parse()?, + flags: split.next().ok_or(Error::Underflow)?.parse()?, + min_flt: split.next().ok_or(Error::Underflow)?.parse()?, + cmin_flt: split.next().ok_or(Error::Underflow)?.parse()?, + maj_flt: split.next().ok_or(Error::Underflow)?.parse()?, + cmaj_flt: split.next().ok_or(Error::Underflow)?.parse()?, + utime: split.next().ok_or(Error::Underflow)?.parse()?, + stime: split.next().ok_or(Error::Underflow)?.parse()?, + cutime: split.next().ok_or(Error::Underflow)?.parse()?, + cstime: split.next().ok_or(Error::Underflow)?.parse()?, + priority: split.next().ok_or(Error::Underflow)?.parse()?, + nice: split.next().ok_or(Error::Underflow)?.parse()?, + num_threads: split.next().ok_or(Error::Underflow)?.parse()?, + _zero0: split.next().ok_or(Error::Underflow)?.parse()?, + // constant 0, + start_time: split.next().ok_or(Error::Underflow)?.parse()?, + vsize: split.next().ok_or(Error::Underflow)?.parse()?, + rss: split.next().ok_or(Error::Underflow)?.parse()?, + rsslim: split.next().ok_or(Error::Underflow)?.parse()?, + start_code: split.next().ok_or(Error::Underflow)?.parse()?, + end_code: split.next().ok_or(Error::Underflow)?.parse()?, + start_stack: split.next().ok_or(Error::Underflow)?.parse()?, + esp: split.next().ok_or(Error::Underflow)?.parse()?, + eip: split.next().ok_or(Error::Underflow)?.parse()?, + pending: split.next().ok_or(Error::Underflow)?.parse()?, + blocked: split.next().ok_or(Error::Underflow)?.parse()?, + sigign: split.next().ok_or(Error::Underflow)?.parse()?, + sigcatch: split.next().ok_or(Error::Underflow)?.parse()?, + wchan: split.next().ok_or(Error::Underflow)?.parse()?, + _zero1: split.next().ok_or(Error::Underflow)?.parse()?, + // constant 0, + _zero2: split.next().ok_or(Error::Underflow)?.parse()?, + // constant 0, + exit_signal: split.next().ok_or(Error::Underflow)?.parse()?, + task_cpu: split.next().ok_or(Error::Underflow)?.parse()?, + rt_priority: split.next().ok_or(Error::Underflow)?.parse()?, + policy: split.next().ok_or(Error::Underflow)?.parse()?, + blkio_ticks: split.next().ok_or(Error::Underflow)?.parse()?, + gtime: split.next().ok_or(Error::Underflow)?.parse()?, + cgtime: split.next().ok_or(Error::Underflow)?.parse()?, + start_data: split.next().ok_or(Error::Underflow)?.parse()?, + end_data: split.next().ok_or(Error::Underflow)?.parse()?, + start_brk: split.next().ok_or(Error::Underflow)?.parse()?, + arg_start: split.next().ok_or(Error::Underflow)?.parse()?, + arg_end: split.next().ok_or(Error::Underflow)?.parse()?, + env_start: split.next().ok_or(Error::Underflow)?.parse()?, + env_end: split.next().ok_or(Error::Underflow)?.parse()?, + exit_code: split.next().ok_or(Error::Underflow)?.parse()?, + }) + } + } + + impl Stats { + pub fn from_self() -> Result { + let mut buffer = String::new(); + use std::io::Read; + std::fs::File::open("/proc/thread-self/stat")?.read_to_string(&mut buffer)?; + buffer.parse() + } + } +} + +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + + let timer = std::time::Instant::now(); + + let mut args = std::env::args(); + args.next(); + + let dataflows = args.next().unwrap().parse::().unwrap(); + let length = args.next().unwrap().parse::().unwrap(); + let record = args.next() == Some("record".to_string()); + + let mut inputs = Vec::new(); + let mut probes = Vec::new(); + + // create a new input, exchange data, and inspect its output + for _dataflow in 0 .. dataflows { + let logging = worker.log_register().get("timely").map(Into::into); + worker.dataflow_subgraph::<_, _, _, _, ThreadStatSubgraphBuilder>>("Dataflow", logging, (), |(), scope| { + let (input, mut stream) = scope.new_input(); + for _step in 0 .. length { + stream = stream.map(|x: ()| { + // Simluate CPU intensive task + for i in 0..1_000_000 { + std::hint::black_box(i); + } + // If we were to sleep here, `utime` would not increase. + x + }); + } + let probe = stream.probe(); + inputs.push(input); + probes.push(probe); + }); + } + + println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); + + for round in 0 .. 10_000 { + let dataflow = round % dataflows; + if record { + inputs[dataflow].send(()); + } + inputs[dataflow].advance_to(round); + let mut steps = 0; + while probes[dataflow].less_than(&round) { + worker.step(); + steps += 1; + } + println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + } + + }).unwrap(); +} diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index bc221c4a6..5c8bcced5 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -33,9 +33,16 @@ use crate::dataflow::channels::Message; use crate::worker::AsWorker; use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::scopes::Child; +use crate::progress::subgraph::SubgraphBuilderT; /// Extension trait to move a `Stream` into a child of its current `Scope`. -pub trait Enter, C: Container> { +pub trait Enter +where + G: Scope, + T: Timestamp + Refines, + C: Container, + SG: SubgraphBuilderT, +{ /// Moves the `Stream` argument into a child of its current `Scope`. /// /// # Examples @@ -50,11 +57,17 @@ pub trait Enter, C: Container> { /// }); /// }); /// ``` - fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore, C>; + fn enter<'a>(&self, _: &Child<'a, G, T, SG>) -> StreamCore, C>; } -impl, C: Data+Container> Enter for StreamCore { - fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { +impl Enter for StreamCore +where + G: Scope, + T: Timestamp + Refines, + C: Data + Container, + SG: SubgraphBuilderT, +{ + fn enter<'a>(&self, scope: &Child<'a, G, T, SG>) -> StreamCore, C> { use crate::scheduling::Scheduler; @@ -103,7 +116,13 @@ pub trait Leave { fn leave(&self) -> StreamCore; } -impl> Leave for StreamCore, C> { +impl Leave for StreamCore, C> +where + G: Scope, + C: Container + Data, + T: Timestamp + Refines, + SG: SubgraphBuilderT, +{ fn leave(&self) -> StreamCore { let scope = self.scope(); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 8fdd4f708..f4d4c36aa 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -11,6 +11,7 @@ use crate::dataflow::{StreamCore, Scope}; use crate::order::Product; use crate::progress::frontier::Antichain; use crate::progress::{Timestamp, PathSummary}; +use crate::progress::subgraph::SubgraphBuilderT; /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. pub trait Feedback { @@ -40,7 +41,12 @@ pub trait Feedback { } /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. -pub trait LoopVariable<'a, G: Scope, T: Timestamp> { +pub trait LoopVariable<'a, G, T, SG> +where + G: Scope, + T: Timestamp, + SG: SubgraphBuilderT>, +{ /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. /// /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with @@ -51,10 +57,11 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// ``` /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// use timely::progress::SubgraphBuilder; /// /// timely::example(|scope| { /// // circulate 0..10 for 100 iterations. - /// scope.iterative::(|inner| { + /// scope.iterative::>(|inner| { /// let (handle, cycle) = inner.loop_variable(1); /// (0..10).to_stream(inner) /// .concat(&cycle) @@ -64,7 +71,7 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); } impl Feedback for G { @@ -78,8 +85,13 @@ impl Feedback for G { } } -impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { +impl<'a, G, T, SG> LoopVariable<'a, G, T, SG> for Iterative<'a, G, T, SG> +where + G: Scope, + T: Timestamp, + SG: SubgraphBuilderT>, +{ + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { self.feedback(Product::new(Default::default(), summary)) } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index ee3cbb12d..4fc377c4f 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -7,28 +7,29 @@ use crate::communication::{Exchangeable, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::Scheduler; use crate::scheduling::activate::Activations; -use crate::progress::{Timestamp, Operate, SubgraphBuilder}; +use crate::progress::{Timestamp, Operate}; use crate::progress::{Source, Target}; use crate::progress::timestamp::Refines; use crate::order::Product; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; +use crate::progress::subgraph::SubgraphBuilderT; use crate::worker::{AsWorker, Config}; use super::{ScopeParent, Scope}; /// Type alias for iterative child scope. -pub type Iterative<'a, G, T> = Child<'a, G, Product<::Timestamp, T>>; +pub type Iterative<'a, G, T, SG> = Child<'a, G, Product<::Timestamp, T>, SG>; /// A `Child` wraps a `Subgraph` and a parent `G: Scope`. It manages the addition /// of `Operate`s to a subgraph, and the connection of edges between them. -pub struct Child<'a, G, T> +pub struct Child<'a, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, { /// The subgraph under assembly. - pub subgraph: &'a RefCell>, + pub subgraph: &'a RefCell, /// A copy of the child's parent scope. pub parent: G, /// The log writer for this scope. @@ -37,10 +38,11 @@ where pub progress_logging: Option>, } -impl Child<'_, G, T> +impl Child<'_, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, + SG: SubgraphBuilderT, { /// This worker's unique identifier. /// @@ -50,10 +52,11 @@ where pub fn peers(&self) -> usize { self.parent.peers() } } -impl AsWorker for Child<'_, G, T> +impl AsWorker for Child<'_, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, + SG: SubgraphBuilderT, { fn config(&self) -> &Config { self.parent.config() } fn index(&self) -> usize { self.parent.index() } @@ -78,34 +81,37 @@ where } } -impl Scheduler for Child<'_, G, T> +impl Scheduler for Child<'_, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, + SG: SubgraphBuilderT, { fn activations(&self) -> Rc> { self.parent.activations() } } -impl ScopeParent for Child<'_, G, T> +impl ScopeParent for Child<'_, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, + SG: SubgraphBuilderT, { type Timestamp = T; } -impl Scope for Child<'_, G, T> +impl Scope for Child<'_, G, T, SG> where G: ScopeParent, T: Timestamp+Refines, + SG: SubgraphBuilderT, { - fn name(&self) -> String { self.subgraph.borrow().name.clone() } - fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path) } + fn name(&self) -> String { self.subgraph.borrow().name().to_owned() } + fn addr(&self) -> Rc<[usize]> { Rc::clone(&self.subgraph.borrow().path()) } fn addr_for_child(&self, index: usize) -> Rc<[usize]> { - let path = &self.subgraph.borrow().path[..]; + let path = &self.subgraph.borrow().path()[..]; let mut addr = Vec::with_capacity(path.len() + 1); addr.extend_from_slice(path); addr.push(index); @@ -125,10 +131,11 @@ where } #[inline] - fn scoped(&mut self, name: &str, func: F) -> R + fn scoped(&mut self, name: &str, func: F) -> R where T2: Timestamp+Refines, - F: FnOnce(&mut Child) -> R, + F: FnOnce(&mut Child) -> R, + SSG: SubgraphBuilderT, { let index = self.subgraph.borrow_mut().allocate_child_id(); let identifier = self.new_identifier(); @@ -138,7 +145,7 @@ where let progress_logging = self.logger_for(&format!("timely/progress/{type_name}")); let summary_logging = self.logger_for(&format!("timely/summary/{type_name}")); - let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name)); + let subscope = RefCell::new(SSG::new_from(path, identifier, self.logging(), summary_logging, name)); let result = { let mut builder = Child { subgraph: &subscope, @@ -156,10 +163,11 @@ where } } -impl Clone for Child<'_, G, T> +impl Clone for Child<'_, G, T, SG> where G: ScopeParent, - T: Timestamp+Refines + T: Timestamp+Refines, + SG: SubgraphBuilderT, { fn clone(&self) -> Self { Child { diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 511bfe492..2de8b001d 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -1,10 +1,11 @@ //! Hierarchical organization of timely dataflow graphs. use std::rc::Rc; -use crate::progress::{Timestamp, Operate, Source, Target}; +use crate::progress::{Timestamp, Operate, Source, Target, SubgraphBuilder}; use crate::order::Product; use crate::progress::timestamp::Refines; use crate::communication::Allocate; +use crate::progress::subgraph::SubgraphBuilderT; use crate::worker::AsWorker; pub mod child; @@ -84,22 +85,24 @@ pub trait Scope: ScopeParent { /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Input, Enter, Leave}; /// use timely::order::Product; + /// use timely::progress::SubgraphBuilder; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { /// let (input, stream) = child1.new_input::(); - /// let output = child1.scoped::,_,_>("ScopeName", |child2| { + /// let output = child1.scoped::,_,_,SubgraphBuilder<_,_>>("ScopeName", |child2| { /// stream.enter(child2).leave() /// }); /// input /// }); /// }); /// ``` - fn scoped(&mut self, name: &str, func: F) -> R + fn scoped(&mut self, name: &str, func: F) -> R where T: Timestamp+Refines<::Timestamp>, - F: FnOnce(&mut Child) -> R; + F: FnOnce(&mut Child) -> R, + SG: SubgraphBuilderT; /// Creates a iterative dataflow subgraph. /// @@ -111,24 +114,26 @@ pub trait Scope: ScopeParent { /// ``` /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// use timely::progress::SubgraphBuilder; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { /// let (input, stream) = child1.new_input::(); - /// let output = child1.iterative::(|child2| { + /// let output = child1.iterative::>(|child2| { /// stream.enter(child2).leave() /// }); /// input /// }); /// }); /// ``` - fn iterative(&mut self, func: F) -> R + fn iterative(&mut self, func: F) -> R where T: Timestamp, - F: FnOnce(&mut Child::Timestamp, T>>) -> R, + F: FnOnce(&mut Child::Timestamp, T>, SG>) -> R, + SG: SubgraphBuilderT>, { - self.scoped::::Timestamp, T>,R,F>("Iterative", func) + self.scoped::::Timestamp, T>,R,F,_>("Iterative", func) } /// Creates a dataflow region with the same timestamp. @@ -155,7 +160,7 @@ pub trait Scope: ScopeParent { /// ``` fn region(&mut self, func: F) -> R where - F: FnOnce(&mut Child::Timestamp>) -> R, + F: FnOnce(&mut Child::Timestamp, SubgraphBuilder>) -> R, { self.region_named("Region", func) } @@ -173,23 +178,25 @@ pub trait Scope: ScopeParent { /// ``` /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Input, Enter, Leave}; + /// use timely::progress::SubgraphBuilder; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { /// let (input, stream) = child1.new_input::(); - /// let output = child1.region_named("region", |child2| { + /// let output = child1.region_named::<_,_,SubgraphBuilder<_,_>>("region", |child2| { /// stream.enter(child2).leave() /// }); /// input /// }); /// }); /// ``` - fn region_named(&mut self, name: &str, func: F) -> R + fn region_named(&mut self, name: &str, func: F) -> R where - F: FnOnce(&mut Child::Timestamp>) -> R, + F: FnOnce(&mut Child::Timestamp, SG>) -> R, + SG: SubgraphBuilderT, { - self.scoped::<::Timestamp,R,F>(name, func) + self.scoped::<::Timestamp,R,F,_>(name, func) } } diff --git a/timely/src/execute.rs b/timely/src/execute.rs index b9673a98e..95f57a182 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -4,6 +4,7 @@ use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilde use crate::dataflow::scopes::Child; use crate::worker::Worker; use crate::{CommunicationConfig, WorkerConfig}; +use crate::progress::SubgraphBuilder; /// Configures the execution of a timely dataflow computation. #[derive(Clone, Debug)] @@ -122,7 +123,7 @@ impl Config { pub fn example(func: F) -> T where T: Send+'static, - F: FnOnce(&mut Child,u64>)->T+Send+Sync+'static + F: FnOnce(&mut Child,u64,SubgraphBuilder<(),u64>>)->T+Send+Sync+'static { crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 868fd10d9..5e9d7d1ac 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -31,6 +31,53 @@ use crate::worker::ProgressMode; // the Subgraph itself. An identifier greater than zero corresponds to an actual child, which can // be found at position (id - 1) in the `children` field of the Subgraph. +/// Behavior of subgraph builders +pub trait SubgraphBuilderT +where + TOuter: Timestamp, + TInner: Timestamp, +{ + /// The subgraph type produced by this builder. + type Subgraph: Operate + 'static; + + /// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph, + /// terminating with the local index of the new subgraph itself. + fn new_from( + path: Rc<[usize]>, + identifier: usize, + logging: Option, + summary_logging: Option>, + name: &str, + ) -> Self; + + /// Now that initialization is complete, actually build a subgraph. + fn build(self, worker: &mut A) -> Self::Subgraph; + + /// The name of this subgraph. + fn name(&self) -> &str; + + /// A sequence of integers uniquely identifying the subgraph. + fn path(&self) -> Rc<[usize]>; + + /// Introduces a dependence from the source to the target. + /// + /// This method does not effect data movement, but rather reveals to the progress tracking infrastructure + /// that messages produced by `source` should be expected to be consumed at `target`. + fn connect(&mut self, source: Source, target: Target); + + /// Adds a new child to the subgraph. + fn add_child(&mut self, child: Box>, index: usize, identifier: usize); + + /// Allocates a new child identifier, for later use. + fn allocate_child_id(&mut self) -> usize; + + /// Allocates a new input to the subgraph and returns the target to that input in the outer graph. + fn new_input(&mut self, shared_counts: Rc>>) -> Target where TInner: Refines; + + /// Allocates a new output from the subgraph and returns the source of that output in the outer graph. + fn new_output(&mut self) -> Source where TInner: Refines; +} + /// A builder for interactively initializing a `Subgraph`. /// /// This collects all the information necessary to get a `Subgraph` up and @@ -71,47 +118,58 @@ where summary_logging: Option>, } -impl SubgraphBuilder +impl SubgraphBuilderT for SubgraphBuilder where TOuter: Timestamp, TInner: Timestamp+Refines, { - /// Allocates a new input to the subgraph and returns the target to that input in the outer graph. - pub fn new_input(&mut self, shared_counts: Rc>>) -> Target { - self.input_messages.push(shared_counts); - Target::new(self.index, self.input_messages.len() - 1) + type Subgraph = Subgraph; + + fn name(&self) -> &str { + &self.name } - /// Allocates a new output from the subgraph and returns the source of that output in the outer graph. - pub fn new_output(&mut self) -> Source { - self.output_capabilities.push(MutableAntichain::new()); - Source::new(self.index, self.output_capabilities.len() - 1) + fn path(&self) -> Rc<[usize]> { + Rc::clone(&self.path) } - /// Introduces a dependence from the source to the target. - /// - /// This method does not effect data movement, but rather reveals to the progress tracking infrastructure - /// that messages produced by `source` should be expected to be consumed at `target`. - pub fn connect(&mut self, source: Source, target: Target) { + fn connect(&mut self, source: Source, target: Target) { self.edge_stash.push((source, target)); } - /// Creates a `SubgraphBuilder` from a path of indexes from the dataflow root to the subgraph, - /// terminating with the local index of the new subgraph itself. - pub fn new_from( + fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { + if let Some(l) = &mut self.logging { + let mut child_path = Vec::with_capacity(self.path.len() + 1); + child_path.extend_from_slice(&self.path[..]); + child_path.push(index); + + l.log(crate::logging::OperatesEvent { + id: identifier, + addr: child_path, + name: child.name().to_owned(), + }); + } + self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging)); + } + + fn allocate_child_id(&mut self) -> usize { + self.child_count += 1; + self.child_count - 1 + } + + fn new_from( path: Rc<[usize]>, identifier: usize, logging: Option, summary_logging: Option>, name: &str, - ) - -> SubgraphBuilder + ) -> Self { // Put an empty placeholder for "outer scope" representative. let children = vec![PerOperatorState::empty(0, 0)]; let index = path[path.len() - 1]; - SubgraphBuilder { + Self { name: name.to_owned(), path, index, @@ -126,30 +184,7 @@ where } } - /// Allocates a new child identifier, for later use. - pub fn allocate_child_id(&mut self) -> usize { - self.child_count += 1; - self.child_count - 1 - } - - /// Adds a new child to the subgraph. - pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { - if let Some(l) = &mut self.logging { - let mut child_path = Vec::with_capacity(self.path.len() + 1); - child_path.extend_from_slice(&self.path[..]); - child_path.push(index); - - l.log(crate::logging::OperatesEvent { - id: identifier, - addr: child_path, - name: child.name().to_owned(), - }); - } - self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging)); - } - - /// Now that initialization is complete, actually build a subgraph. - pub fn build(mut self, worker: &mut A) -> Subgraph { + fn build(mut self, worker: &mut A) -> Subgraph { // at this point, the subgraph is frozen. we should initialize any internal state which // may have been determined after construction (e.g. the numbers of inputs and outputs). // we also need to determine what to return as a summary and initial capabilities, which @@ -222,8 +257,17 @@ where progress_mode: worker.config().progress_mode, } } -} + fn new_input(&mut self, shared_counts: Rc>>) -> Target where TInner: Refines { + self.input_messages.push(shared_counts); + Target::new(self.index, self.input_messages.len() - 1) + } + + fn new_output(&mut self) -> Source where TInner: Refines { + self.output_capabilities.push(MutableAntichain::new()); + Source::new(self.index, self.output_capabilities.len() - 1) + } +} /// A dataflow subgraph. /// diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 999754ecd..f93993674 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -17,6 +17,7 @@ use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; use crate::dataflow::scopes::Child; use crate::logging::TimelyLogger; +use crate::progress::subgraph::SubgraphBuilderT; /// Different ways in which timely's progress tracking can work. /// @@ -583,7 +584,7 @@ impl Worker { pub fn dataflow(&mut self, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Child)->R, + F: FnOnce(&mut Child>)->R, { self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child)) } @@ -606,7 +607,7 @@ impl Worker { pub fn dataflow_named(&mut self, name: &str, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Child)->R, + F: FnOnce(&mut Child>)->R, { self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child)) } @@ -636,11 +637,46 @@ impl Worker { /// ); /// }); /// ``` - pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R + pub fn dataflow_core(&mut self, name: &str, logging: Option, resources: V, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut V, &mut Child)->R, + F: FnOnce(&mut V, &mut Child>)->R, V: Any+'static, + { + self.dataflow_subgraph::<_, _, _, _, SubgraphBuilder<_, _>>(name, logging, resources, func) + } + + /// Construct a new dataflow with specific configurations. + /// + /// This method constructs a new dataflow, using a name, logger, and additional + /// resources specified as argument. The name is cosmetic, the logger is used to + /// handle events generated by the dataflow, and the additional resources are kept + /// alive for as long as the dataflow is alive (use case: shared library bindings). + /// + /// # Examples + /// ``` + /// timely::execute_from_args(::std::env::args(), |worker| { + /// + /// // We must supply the timestamp type here, although + /// // it would generally be determined by type inference. + /// worker.dataflow_core::( + /// "dataflow X", // Dataflow name + /// None, // Optional logger + /// 37, // Any resources + /// |resources, scope| { // Closure + /// + /// // uses of `resources`, `scope`to build dataflow + /// + /// } + /// ); + /// }); + /// ``` + pub fn dataflow_subgraph(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R + where + T: Refines<()>, + F: FnOnce(&mut V, &mut Child)->R, + V: Any+'static, + SG: SubgraphBuilderT<(), T>, { let dataflow_index = self.allocate_dataflow_index(); let addr = vec![dataflow_index].into(); @@ -649,7 +685,7 @@ impl Worker { let type_name = std::any::type_name::(); let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name)); let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name)); - let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); + let subscope = SG::new_from(addr, identifier, logging.clone(), summary_logging, name); let subscope = RefCell::new(subscope); let result = { From 9bb8e92fa775d1ce781b3c6dcea619db227831c0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 14 Mar 2025 21:09:19 +0100 Subject: [PATCH 2/2] Fix mdbook Signed-off-by: Moritz Hoffmann --- mdbook/src/chapter_4/chapter_4_1.md | 9 ++++++--- mdbook/src/chapter_4/chapter_4_2.md | 3 ++- timely/examples/event_driven_thread_stat.rs | 10 +++++----- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/mdbook/src/chapter_4/chapter_4_1.md b/mdbook/src/chapter_4/chapter_4_1.md index 963e690a2..4dd7d9973 100644 --- a/mdbook/src/chapter_4/chapter_4_1.md +++ b/mdbook/src/chapter_4/chapter_4_1.md @@ -18,12 +18,13 @@ You can create a new scope in any other scope by invoking the `scoped` method: extern crate timely; use timely::dataflow::Scope; +use timely::progress::SubgraphBuilder; fn main() { timely::example(|scope| { // Create a new scope with the same (u64) timestamp. - scope.scoped::("SubScope", |subscope| { + scope.scoped::>("SubScope", |subscope| { // probably want something here }) @@ -46,6 +47,7 @@ extern crate timely; use timely::dataflow::Scope; use timely::dataflow::operators::*; +use timely::progress::SubgraphBuilder; fn main() { timely::example(|scope| { @@ -53,7 +55,7 @@ fn main() { let stream = (0 .. 10).to_stream(scope); // Create a new scope with the same (u64) timestamp. - let result = scope.scoped::("SubScope", |subscope| { + let result = scope.scoped::>("SubScope", |subscope| { stream.enter(subscope) .inspect_batch(|t, xs| println!("{:?}, {:?}", t, xs)) .leave() @@ -108,6 +110,7 @@ extern crate timely; use timely::dataflow::Scope; use timely::dataflow::operators::*; +use timely::progress::SubgraphBuilder; fn main() { timely::example(|scope| { @@ -115,7 +118,7 @@ fn main() { let stream = (0 .. 10).to_stream(scope); // Create a new scope with a (u64, u32) timestamp. - let result = scope.iterative::(|subscope| { + let result = scope.iterative::>(|subscope| { stream.enter(subscope) .inspect_batch(|t, xs| println!("{:?}, {:?}", t, xs)) .leave() diff --git a/mdbook/src/chapter_4/chapter_4_2.md b/mdbook/src/chapter_4/chapter_4_2.md index 7c057da38..e44d6b46c 100644 --- a/mdbook/src/chapter_4/chapter_4_2.md +++ b/mdbook/src/chapter_4/chapter_4_2.md @@ -89,6 +89,7 @@ extern crate timely; use timely::dataflow::operators::*; use timely::dataflow::Scope; +use timely::progress::SubgraphBuilder; fn main() { timely::example(|scope| { @@ -97,7 +98,7 @@ fn main() { // Create a nested iterative scope. // Rust needs help understanding the iteration counter type. - scope.iterative::(|subscope| { + scope.iterative::>(|subscope| { let (handle, stream) = subscope.loop_variable(1); diff --git a/timely/examples/event_driven_thread_stat.rs b/timely/examples/event_driven_thread_stat.rs index 46a799a22..d5681e8d9 100644 --- a/timely/examples/event_driven_thread_stat.rs +++ b/timely/examples/event_driven_thread_stat.rs @@ -3,8 +3,8 @@ use std::rc::Rc; use std::time::Duration; use timely::dataflow::operators::{Input, Map, Probe}; use timely::logging::{TimelyLogger, TimelySummaryLogger}; -use timely::progress::{Antichain, ChangeBatch, Operate, Source, SubgraphBuilder, Target, Timestamp}; -use timely::progress::operate::SharedProgress; +use timely::progress::{ChangeBatch, Operate, Source, SubgraphBuilder, Target, Timestamp}; +use timely::progress::operate::{Connectivity, SharedProgress}; use timely::progress::subgraph::SubgraphBuilderT; use timely::progress::timestamp::Refines; use timely::scheduling::Schedule; @@ -53,7 +53,7 @@ impl> Operate for ThreadStatSubgr self.inner.outputs() } - fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { + fn get_internal_summary(&mut self) -> (Connectivity, Rc>>) { self.inner.get_internal_summary() } @@ -294,12 +294,12 @@ fn main() { // create a new input, exchange data, and inspect its output for _dataflow in 0 .. dataflows { - let logging = worker.log_register().get("timely").map(Into::into); + let logging = worker.logging(); worker.dataflow_subgraph::<_, _, _, _, ThreadStatSubgraphBuilder>>("Dataflow", logging, (), |(), scope| { let (input, mut stream) = scope.new_input(); for _step in 0 .. length { stream = stream.map(|x: ()| { - // Simluate CPU intensive task + // Simulate CPU intensive task for i in 0..1_000_000 { std::hint::black_box(i); }