diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fbf40b2ca9..64a9f1ae2f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,10 +12,10 @@ jobs: os: - ubuntu - macos - - windows + # - windows toolchain: - stable - - 1.78 + - 1.79 name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: diff --git a/advent_of_code_2017/src/bin/day_09.rs b/advent_of_code_2017/src/bin/day_09.rs index 7642750b25..058ef7291b 100644 --- a/advent_of_code_2017/src/bin/day_09.rs +++ b/advent_of_code_2017/src/bin/day_09.rs @@ -97,8 +97,7 @@ fn main() { /// Accumulate data in `collection` into all powers-of-two intervals containing them. fn pp_aggregate(collection: Collection, combine: F) -> Collection where - G: Scope, - G::Timestamp: Lattice, + G: Scope, D: Data, F: Fn(D, &D) -> D + 'static, { @@ -132,8 +131,7 @@ fn pp_broadcast( zero: D, combine: F) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord+::std::fmt::Debug, + G: Scope, D: Data, B: Data+::std::hash::Hash, F: Fn(&B, &D) -> B + 'static, diff --git a/differential-dataflow/examples/bfs.rs b/differential-dataflow/examples/bfs.rs index e023d01bb7..037ace6ac0 100644 --- a/differential-dataflow/examples/bfs.rs +++ b/differential-dataflow/examples/bfs.rs @@ -91,9 +91,10 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where + G: Scope, +{ // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 3ab0376662..026c7f2c06 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -139,7 +139,7 @@ mod container { fn default() -> Self { Self::Typed(Default::default()) } } - impl Clone for Column where C::Container: Clone { + impl> Clone for Column { fn clone(&self) -> Self { match self { Column::Typed(t) => Column::Typed(t.clone()), @@ -213,7 +213,7 @@ mod container { } use timely::container::PushInto; - impl PushInto for Column where C::Container: columnar::Push { + impl>> PushInto for Column { #[inline] fn push_into(&mut self, item: T) { use columnar::Push; @@ -286,7 +286,7 @@ mod builder { } use timely::container::PushInto; - impl PushInto for ColumnBuilder where C::Container: columnar::Push { + impl>> PushInto for ColumnBuilder { #[inline] fn push_into(&mut self, item: T) { self.current.push(item); @@ -314,7 +314,7 @@ mod builder { } use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder}; - impl ContainerBuilder for ColumnBuilder where C::Container: Clone { + impl> ContainerBuilder for ColumnBuilder { type Container = Column; #[inline] @@ -342,7 +342,7 @@ mod builder { } } - impl LengthPreservingContainerBuilder for ColumnBuilder where C::Container: Clone { } + impl> LengthPreservingContainerBuilder for ColumnBuilder { } } @@ -396,12 +396,9 @@ pub mod batcher { impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker where - D: Columnar, - for<'b> D::Ref<'b>: Ord + Copy, - T: Columnar, - for<'b> T::Ref<'b>: Ord + Copy, - R: Columnar + Semigroup + for<'b> Semigroup>, - for<'b> R::Ref<'b>: Ord, + D: for<'b> Columnar: Ord>, + T: for<'b> Columnar: Ord>, + R: for<'b> Columnar: Ord> + for<'b> Semigroup>, C2: Container + for<'b> PushInto<&'b (D, T, R)>, { fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) { diff --git a/differential-dataflow/examples/dynamic.rs b/differential-dataflow/examples/dynamic.rs index cfa01461e2..e83de369f4 100644 --- a/differential-dataflow/examples/dynamic.rs +++ b/differential-dataflow/examples/dynamic.rs @@ -91,9 +91,10 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where + G: Scope, +{ use timely::order::Product; use iterate::Variable; use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp}; diff --git a/differential-dataflow/examples/graspan.rs b/differential-dataflow/examples/graspan.rs index 3d70096d00..5d1e4f95e3 100644 --- a/differential-dataflow/examples/graspan.rs +++ b/differential-dataflow/examples/graspan.rs @@ -82,14 +82,14 @@ type Arrange = Arranged::Tim /// /// An edge variable provides arranged representations of its contents, even before they are /// completely defined, in support of recursively defined productions. -pub struct EdgeVariable where G::Timestamp : Lattice { +pub struct EdgeVariable> { variable: Variable, current: Collection, forward: Option>, reverse: Option>, } -impl EdgeVariable where G::Timestamp : Lattice { +impl> EdgeVariable { /// Creates a new variable initialized with `source`. pub fn from(source: &Collection, step: ::Summary) -> Self { let variable = Variable::new(&mut source.scope(), step); @@ -152,9 +152,10 @@ impl Query { } /// Creates a dataflow implementing the query, and returns input and trace handles. - pub fn render_in(&self, scope: &mut G) -> IndexMap> - where G::Timestamp: Lattice+::timely::order::TotalOrder { - + pub fn render_in(&self, scope: &mut G) -> IndexMap> + where + G: Scope, + { // Create new input (handle, stream) pairs let mut input_map = IndexMap::new(); for production in self.productions.iter() { diff --git a/differential-dataflow/examples/interpreted.rs b/differential-dataflow/examples/interpreted.rs index 3a04edaeb6..646ea6c2e2 100644 --- a/differential-dataflow/examples/interpreted.rs +++ b/differential-dataflow/examples/interpreted.rs @@ -41,8 +41,10 @@ fn main() { }).unwrap(); } -fn interpret(edges: &Collection, relations: &[(usize, usize)]) -> Collection> -where G::Timestamp: Lattice+Hash+Ord { +fn interpret(edges: &Collection, relations: &[(usize, usize)]) -> Collection> +where + G: Scope, +{ // arrange the edge relation three ways. let as_self = edges.arrange_by_self(); diff --git a/differential-dataflow/examples/itembased_cf.rs b/differential-dataflow/examples/itembased_cf.rs index 814abd4c30..90384fa56b 100644 --- a/differential-dataflow/examples/itembased_cf.rs +++ b/differential-dataflow/examples/itembased_cf.rs @@ -144,7 +144,7 @@ impl CRP { CRP { alpha, discount, weight: 0.0, weights: Vec::new() } } - fn sample(&mut self, rng: &mut R) -> u32 where R: Rng { + fn sample(&mut self, rng: &mut R) -> u32 { let mut u = rng.gen::() * (self.alpha + self.weight); for j in 0 .. self.weights.len() { if u < self.weights[j] - self.discount { @@ -164,7 +164,7 @@ impl CRP { } // Generate synthetic interactions with a skewed distribution -fn generate_interactions(how_many: usize, rng: &mut R) -> Vec<(u32,u32)> where R: Rng { +fn generate_interactions(how_many: usize, rng: &mut R) -> Vec<(u32,u32)> { let mut interactions = Vec::with_capacity(how_many); let mut user_sampler = CRP::new(6000.0, 0.35); diff --git a/differential-dataflow/examples/monoid-bfs.rs b/differential-dataflow/examples/monoid-bfs.rs index 59eddc6084..394b14e69f 100644 --- a/differential-dataflow/examples/monoid-bfs.rs +++ b/differential-dataflow/examples/monoid-bfs.rs @@ -123,9 +123,10 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where + G: Scope, +{ // repeatedly update minimal distances each node can be reached from each root roots.scope().iterative::(|scope| { diff --git a/differential-dataflow/examples/pagerank.rs b/differential-dataflow/examples/pagerank.rs index 417c5c3f9d..107e340923 100644 --- a/differential-dataflow/examples/pagerank.rs +++ b/differential-dataflow/examples/pagerank.rs @@ -79,8 +79,7 @@ fn main() { // to its PageRank in the input graph `edges`. fn pagerank(iters: Iter, edges: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice, + G: Scope, { // initialize many surfers at each node. let nodes = diff --git a/differential-dataflow/examples/progress.rs b/differential-dataflow/examples/progress.rs index 3bfbec76c0..8ee1d6f291 100644 --- a/differential-dataflow/examples/progress.rs +++ b/differential-dataflow/examples/progress.rs @@ -114,14 +114,14 @@ fn main() { /// /// The computation to determine this, and to maintain it as times change, is an iterative /// computation that propagates times and maintains the minimal elements at each location. -fn frontier( +fn frontier( nodes: Collection, edges: Collection, times: Collection, ) -> Collection where - G::Timestamp: Lattice+Ord, - T::Summary: differential_dataflow::ExchangeData, + G: Scope, + T: Timestamp, { // Translate node and edge transitions into a common Location to Location edge with an associated Summary. let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary))); @@ -148,13 +148,13 @@ where } /// Summary paths from locations to operator zero inputs. -fn summarize( +fn summarize( nodes: Collection, edges: Collection, ) -> Collection where - G::Timestamp: Lattice+Ord, - T::Summary: differential_dataflow::ExchangeData+std::hash::Hash, + G: Scope, + T: Timestamp, { // Start from trivial reachability from each input to itself. let zero_inputs = @@ -196,8 +196,8 @@ fn find_cycles( edges: Collection, ) -> Collection where - G::Timestamp: Lattice+Ord, - T::Summary: differential_dataflow::ExchangeData, + G: Scope, + T: Timestamp, { // Retain node connections along "default" timestamp summaries. let nodes = nodes.flat_map(|(target, source, summary)| { diff --git a/differential-dataflow/examples/stackoverflow.rs b/differential-dataflow/examples/stackoverflow.rs index b2489c0d87..ab64c51a01 100644 --- a/differential-dataflow/examples/stackoverflow.rs +++ b/differential-dataflow/examples/stackoverflow.rs @@ -105,9 +105,10 @@ fn main() { } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where + G: Scope, +{ // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/src/algorithms/graphs/bfs.rs b/differential-dataflow/src/algorithms/graphs/bfs.rs index 2211a52769..9e792e0821 100644 --- a/differential-dataflow/src/algorithms/graphs/bfs.rs +++ b/differential-dataflow/src/algorithms/graphs/bfs.rs @@ -11,8 +11,7 @@ use crate::lattice::Lattice; /// Returns pairs (node, dist) indicating distance of each node from a root. pub fn bfs(edges: &Collection, roots: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, N: ExchangeData+Hash, { use crate::operators::arrange::arrangement::ArrangeByKey; diff --git a/differential-dataflow/src/algorithms/graphs/bijkstra.rs b/differential-dataflow/src/algorithms/graphs/bijkstra.rs index 7dc11c1d94..171563c4e7 100644 --- a/differential-dataflow/src/algorithms/graphs/bijkstra.rs +++ b/differential-dataflow/src/algorithms/graphs/bijkstra.rs @@ -22,8 +22,7 @@ use crate::operators::iterate::Variable; /// could be good insurance here. pub fn bidijkstra(edges: &Collection, goals: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, N: ExchangeData+Hash, { use crate::operators::arrange::arrangement::ArrangeByKey; diff --git a/differential-dataflow/src/algorithms/graphs/propagate.rs b/differential-dataflow/src/algorithms/graphs/propagate.rs index aeb57b9223..1ba7ad746f 100644 --- a/differential-dataflow/src/algorithms/graphs/propagate.rs +++ b/differential-dataflow/src/algorithms/graphs/propagate.rs @@ -16,8 +16,7 @@ use crate::operators::arrange::arrangement::ArrangeByKey; /// method to limit the introduction of labels. pub fn propagate(edges: &Collection, nodes: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, @@ -34,8 +33,7 @@ where /// method to limit the introduction of labels. pub fn propagate_at(edges: &Collection, nodes: &Collection, logic: F) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, N: ExchangeData+Hash, R: ExchangeData+Abelian, R: Multiply, diff --git a/differential-dataflow/src/algorithms/graphs/scc.rs b/differential-dataflow/src/algorithms/graphs/scc.rs index 3c4453cd7f..3c784060c0 100644 --- a/differential-dataflow/src/algorithms/graphs/scc.rs +++ b/differential-dataflow/src/algorithms/graphs/scc.rs @@ -15,9 +15,8 @@ use super::propagate::propagate; /// Iteratively removes nodes with no in-edges. pub fn trim(graph: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, - N: ExchangeData+Hash, + G: Scope, + N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, R: From, @@ -36,9 +35,8 @@ where /// Returns the subset of edges in the same strongly connected component. pub fn strongly_connected(graph: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, - N: ExchangeData+Hash, + G: Scope, + N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, R: From @@ -53,9 +51,8 @@ where fn trim_edges(cycle: &Collection, edges: &Collection) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord, - N: ExchangeData+Hash, + G: Scope, + N: ExchangeData + Hash, R: ExchangeData + Abelian, R: Multiply, R: From diff --git a/differential-dataflow/src/algorithms/graphs/sequential.rs b/differential-dataflow/src/algorithms/graphs/sequential.rs index f5a730638c..9ee8d52adf 100644 --- a/differential-dataflow/src/algorithms/graphs/sequential.rs +++ b/differential-dataflow/src/algorithms/graphs/sequential.rs @@ -11,8 +11,7 @@ use crate::hashable::Hashable; fn _color(edges: &Collection) -> Collection)> where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, N: ExchangeData+Hash, { // need some bogus initial values. @@ -45,8 +44,7 @@ pub fn sequence( edges: &Collection, logic: F) -> Collection)> where - G: Scope, - G::Timestamp: Lattice+Hash+Ord, + G: Scope, N: ExchangeData+Hashable, V: ExchangeData, F: Fn(&N, &[(&V, isize)])->V+'static diff --git a/differential-dataflow/src/algorithms/identifiers.rs b/differential-dataflow/src/algorithms/identifiers.rs index 91e18647ca..f5f17522e6 100644 --- a/differential-dataflow/src/algorithms/identifiers.rs +++ b/differential-dataflow/src/algorithms/identifiers.rs @@ -33,10 +33,9 @@ pub trait Identifiers { impl Identifiers for Collection where - G: Scope, - G::Timestamp: Lattice, - D: ExchangeData+::std::hash::Hash, - R: ExchangeData+Abelian, + G: Scope, + D: ExchangeData + ::std::hash::Hash, + R: ExchangeData + Abelian, { fn identifiers(&self) -> Collection { diff --git a/differential-dataflow/src/algorithms/prefix_sum.rs b/differential-dataflow/src/algorithms/prefix_sum.rs index 8664c5db2f..57f8d1e7b2 100644 --- a/differential-dataflow/src/algorithms/prefix_sum.rs +++ b/differential-dataflow/src/algorithms/prefix_sum.rs @@ -21,10 +21,9 @@ pub trait PrefixSum { impl PrefixSum for Collection where - G: Scope, - G::Timestamp: Lattice, - K: ExchangeData+::std::hash::Hash, - D: ExchangeData+::std::hash::Hash, + G: Scope, + K: ExchangeData + ::std::hash::Hash, + D: ExchangeData + ::std::hash::Hash, { fn prefix_sum(&self, zero: D, combine: F) -> Self where F: Fn(&K,&D,&D)->D + 'static { self.prefix_sum_at(self.map(|(x,_)| x), zero, combine) @@ -43,10 +42,9 @@ where /// Accumulate data in `collection` into all powers-of-two intervals containing them. pub fn aggregate(collection: Collection, combine: F) -> Collection where - G: Scope, - G::Timestamp: Lattice, - K: ExchangeData+::std::hash::Hash, - D: ExchangeData+::std::hash::Hash, + G: Scope, + K: ExchangeData + ::std::hash::Hash, + D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, { // initial ranges are at each index, and with width 2^0. @@ -79,10 +77,9 @@ pub fn broadcast( zero: D, combine: F) -> Collection where - G: Scope, - G::Timestamp: Lattice+Ord+::std::fmt::Debug, - K: ExchangeData+::std::hash::Hash, - D: ExchangeData+::std::hash::Hash, + G: Scope, + K: ExchangeData + ::std::hash::Hash, + D: ExchangeData + ::std::hash::Hash, F: Fn(&K,&D,&D)->D + 'static, { diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index 5952aed654..e901fc747b 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -153,7 +153,9 @@ impl Collection { /// }); /// ``` pub fn inspect_container(&self, func: F) -> Self - where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static { + where + F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static, + { self.inner .inspect_container(func) .as_collection() @@ -205,7 +207,10 @@ impl Collection { /// ``` // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect // an inherent method on `Collection`. - pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { + pub fn negate(&self) -> Collection + where + StreamCore: crate::operators::Negate + { crate::operators::Negate::negate(&self.inner).as_collection() } } @@ -226,8 +231,9 @@ impl Collection { /// }); /// ``` pub fn map(&self, mut logic: L) -> Collection - where D2: Data, - L: FnMut(D) -> D2 + 'static + where + D2: Data, + L: FnMut(D) -> D2 + 'static, { self.inner .map(move |(data, time, delta)| (logic(data), time, delta)) @@ -252,7 +258,9 @@ impl Collection { /// }); /// ``` pub fn map_in_place(&self, mut logic: L) -> Collection - where L: FnMut(&mut D) + 'static { + where + L: FnMut(&mut D) + 'static, + { self.inner .map_in_place(move |&mut (ref mut data, _, _)| logic(data)) .as_collection() @@ -274,10 +282,11 @@ impl Collection { /// }); /// ``` pub fn flat_map(&self, mut logic: L) -> Collection - where G::Timestamp: Clone, - I: IntoIterator, - I::Item: Data, - L: FnMut(D) -> I + 'static { + where + G::Timestamp: Clone, + I: IntoIterator, + L: FnMut(D) -> I + 'static, + { self.inner .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone()))) .as_collection() @@ -297,7 +306,9 @@ impl Collection { /// }); /// ``` pub fn filter(&self, mut logic: L) -> Collection - where L: FnMut(&D) -> bool + 'static { + where + L: FnMut(&D) -> bool + 'static, + { self.inner .filter(move |(data, _, _)| logic(data)) .as_collection() @@ -323,11 +334,11 @@ impl Collection { /// }); /// ``` pub fn explode(&self, mut logic: L) -> Collection>::Output> - where D2: Data, - R2: Semigroup+Multiply, - >::Output: Semigroup+'static, - I: IntoIterator, - L: FnMut(D)->I+'static, + where + D2: Data, + R2: Semigroup+Multiply, + I: IntoIterator, + L: FnMut(D)->I+'static, { self.inner .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d)))) @@ -357,12 +368,12 @@ impl Collection { /// }); /// ``` pub fn join_function(&self, mut logic: L) -> Collection>::Output> - where G::Timestamp: Lattice, - D2: Data, - R2: Semigroup+Multiply, - >::Output: Semigroup+'static, - I: IntoIterator, - L: FnMut(D)->I+'static, + where + G::Timestamp: Lattice, + D2: Data, + R2: Semigroup+Multiply, + I: IntoIterator, + L: FnMut(D)->I+'static, { self.inner .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d)))) @@ -444,8 +455,9 @@ impl Collection { /// is because we advance the timely capability with the same logic, and it must remain `less_equal` /// to all of the data timestamps). pub fn delay(&self, func: F) -> Collection - where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static { - + where + F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static, + { let mut func1 = func.clone(); let mut func2 = func.clone(); @@ -479,7 +491,9 @@ impl Collection { /// }); /// ``` pub fn inspect(&self, func: F) -> Collection - where F: FnMut(&(D, G::Timestamp, R))+'static { + where + F: FnMut(&(D, G::Timestamp, R))+'static, + { self.inner .inspect(func) .as_collection() @@ -503,7 +517,9 @@ impl Collection { /// }); /// ``` pub fn inspect_batch(&self, mut func: F) -> Collection - where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static { + where + F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static, + { self.inner .inspect_batch(move |time, data| func(time, data)) .as_collection() @@ -529,9 +545,10 @@ impl Collection { /// }); /// ``` pub fn assert_empty(&self) - where D: crate::ExchangeData+Hashable, - R: crate::ExchangeData+Hashable + Semigroup, - G::Timestamp: Lattice+Ord, + where + D: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Hashable + Semigroup, + G::Timestamp: Lattice+Ord, { self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); @@ -589,7 +606,7 @@ impl Collection, D } /// Methods requiring an Abelian difference, to support negation. -impl Collection where G::Timestamp: Data { +impl, D: Clone+'static, R: Abelian+'static> Collection { /// Assert if the collections are ever different. /// /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation @@ -614,9 +631,10 @@ impl Collection where G /// }); /// ``` pub fn assert_eq(&self, other: &Self) - where D: crate::ExchangeData+Hashable, - R: crate::ExchangeData+Hashable, - G::Timestamp: Lattice+Ord + where + D: crate::ExchangeData+Hashable, + R: crate::ExchangeData+Hashable, + G::Timestamp: Lattice+Ord, { self.negate() .concat(other) @@ -665,7 +683,7 @@ pub fn concatenate(scope: &mut G, iterator: I) -> Collection>, { diff --git a/differential-dataflow/src/consolidation.rs b/differential-dataflow/src/consolidation.rs index 9c1ed008e9..1fbb283b13 100644 --- a/differential-dataflow/src/consolidation.rs +++ b/differential-dataflow/src/consolidation.rs @@ -293,7 +293,7 @@ impl ConsolidateLayout for Vec<(D, T, R)> where D: Ord + Clone + 'static, T: Ord + Clone + 'static, - for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static, + R: Semigroup + for<'a> IntoOwned<'a, Owned = R> + Clone + 'static, { type Key<'a> = (D, T) where Self: 'a; type Diff<'a> = R where Self: 'a; diff --git a/differential-dataflow/src/containers.rs b/differential-dataflow/src/containers.rs index 84d10017f6..9d974104c1 100644 --- a/differential-dataflow/src/containers.rs +++ b/differential-dataflow/src/containers.rs @@ -38,7 +38,7 @@ impl TimelyStack { #[inline(always)] pub fn reserve_items<'a, I>(&mut self, items: I) where - I: Iterator+Clone, + I: Iterator + Clone, T: 'a, { self.local.reserve(items.clone().count()); @@ -53,7 +53,7 @@ impl TimelyStack { pub fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, - I: Iterator+Clone, + I: Iterator + Clone, { self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum()); self.inner.reserve_regions(regions.map(|cs| &cs.inner)); diff --git a/differential-dataflow/src/dynamic/pointstamp.rs b/differential-dataflow/src/dynamic/pointstamp.rs index 5928da185c..abc2c574da 100644 --- a/differential-dataflow/src/dynamic/pointstamp.rs +++ b/differential-dataflow/src/dynamic/pointstamp.rs @@ -263,24 +263,16 @@ mod columnation { } /// Stack for PointStamp. Part of Columnation implementation. - pub struct PointStampStack( as Columnation>::InnerRegion) - where - ::Item: Columnation; - - impl Default for PointStampStack - where - ::Item: Columnation - { + pub struct PointStampStack>( as Columnation>::InnerRegion); + + impl> Default for PointStampStack { #[inline] fn default() -> Self { Self(Default::default()) } } - impl Region for PointStampStack - where - ::Item: Columnation - { + impl> Region for PointStampStack { type Item = PointStamp; #[inline] diff --git a/differential-dataflow/src/input.rs b/differential-dataflow/src/input.rs index 5a73b14a9a..f39402fc3e 100644 --- a/differential-dataflow/src/input.rs +++ b/differential-dataflow/src/input.rs @@ -68,7 +68,7 @@ pub trait Input : TimelyInput { /// }).unwrap(); /// ``` fn new_collection_from(&mut self, data: I) -> (InputSession<::Timestamp, I::Item, isize>, Collection) - where I: IntoIterator+'static, I::Item: Data; + where I: IntoIterator + 'static; /// Create a new collection and input handle from initial data. /// /// # Examples @@ -100,7 +100,9 @@ pub trait Input : TimelyInput { use crate::lattice::Lattice; impl Input for G where ::Timestamp: Lattice { fn new_collection(&mut self) -> (InputSession<::Timestamp, D, R>, Collection) - where D: Data, R: Semigroup+'static{ + where + D: Data, R: Semigroup+'static, + { let (handle, stream) = self.new_input(); (InputSession::from(handle), stream.as_collection()) } diff --git a/differential-dataflow/src/logging.rs b/differential-dataflow/src/logging.rs index bf6e6bbd77..4fb6ef4300 100644 --- a/differential-dataflow/src/logging.rs +++ b/differential-dataflow/src/logging.rs @@ -13,7 +13,7 @@ pub type Logger = ::timely::logging_core::TypedLogger(worker: &mut timely::worker::Worker, writer: W) -> Option> where A: timely::communication::Allocate, - W: std::io::Write+'static, + W: std::io::Write + 'static, { worker.log_register().and_then(|mut log_register| { let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer); diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index bc74e82cd4..336fe6c81f 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -10,7 +10,7 @@ use timely::progress::Timestamp; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::dataflow::operators::CapabilitySet; -use crate::trace::{Trace, TraceReader, Batch, BatchReader}; +use crate::trace::{Trace, TraceReader, BatchReader}; use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; @@ -25,10 +25,7 @@ use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier}; /// /// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted /// from the dataflow in which it was defined, and imported into other dataflows. -pub struct TraceAgent -where - Tr: TraceReader, -{ +pub struct TraceAgent { trace: Rc>>, queues: Weak>>>, logical_compaction: Antichain, @@ -39,10 +36,7 @@ where logging: Option, } -impl TraceReader for TraceAgent -where - Tr: TraceReader, -{ +impl TraceReader for TraceAgent { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; @@ -87,7 +81,6 @@ impl TraceAgent { pub fn new(trace: Tr, operator: OperatorInfo, logging: Option) -> (Self, TraceWriter) where Tr: Trace, - Tr::Batch: Batch, { let trace = Rc::new(RefCell::new(TraceBox::new(trace))); let queues = Rc::new(RefCell::new(Vec::new())); @@ -167,10 +160,7 @@ impl TraceAgent { } } -impl TraceAgent -where - Tr: TraceReader+'static, -{ +impl TraceAgent { /// Copies an existing collection into the supplied scope. /// /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange` @@ -525,10 +515,7 @@ impl Drop for ShutdownDeadmans { } } -impl Clone for TraceAgent -where - Tr: TraceReader, -{ +impl Clone for TraceAgent { fn clone(&self) -> Self { if let Some(logging) = &self.logging { @@ -554,10 +541,7 @@ where } } -impl Drop for TraceAgent -where - Tr: TraceReader, -{ +impl Drop for TraceAgent { fn drop(&mut self) { if let Some(logging) = &self.logging { diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index d1387c742f..189cf7dd26 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability; use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned}; use crate::difference::Semigroup; use crate::lattice::Lattice; -use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; +use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; @@ -43,9 +43,9 @@ use super::TraceAgent; /// /// An `Arranged` allows multiple differential operators to share the resources (communication, /// computation, memory) required to produce and maintain an indexed representation of a collection. -pub struct Arranged +pub struct Arranged where - G::Timestamp: Lattice+Ord, + G: Scope, Tr: TraceReader+Clone, { /// A stream containing arranged updates. @@ -187,8 +187,7 @@ where /// filtering or flat mapping as part of the extraction. pub fn flat_map_ref(&self, logic: L) -> Collection where - I: IntoIterator, - I::Item: Data, + I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, { Self::flat_map_batches(&self.stream, logic) @@ -203,8 +202,7 @@ where /// If you have the arrangement, its `flat_map_ref` method is equivalent to this. pub fn flat_map_batches(stream: &Stream, mut logic: L) -> Collection where - I: IntoIterator, - I::Item: Data, + I: IntoIterator, L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static, { stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { @@ -243,10 +241,8 @@ where pub fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where T2: for<'a> TraceReader=T1::Key<'a>,Time=T1::Time>+Clone+'static, - T1::Diff: Multiply, - >::Output: Semigroup+'static, - I: IntoIterator, - I::Item: Data, + T1::Diff: Multiply, + I: IntoIterator, L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static { let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| { @@ -290,14 +286,15 @@ where pub fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, - T2: for<'a> Trace= T1::Key<'a>, Time=T1::Time>+'static, + T2: for<'a> Trace< + Key<'a>= T1::Key<'a>, + Val<'a> : IntoOwned<'a, Owned = V>, + Time=T1::Time, + Diff: Abelian, + >+'static, K: Ord + 'static, V: Data, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Diff: Abelian, - T2::Batch: Batch, - Bu: Builder, - Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| { @@ -313,13 +310,14 @@ where pub fn reduce_core(&self, name: &str, logic: L) -> Arranged> where for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, - T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, + T2: for<'a> Trace< + Key<'a>=T1::Key<'a>, + Val<'a> : IntoOwned<'a, Owned = V>, + Time=T1::Time, + >+'static, K: Ord + 'static, V: Data, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Batch: Batch, - Bu: Builder, - Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; @@ -349,8 +347,7 @@ where /// A type that can be arranged as if a collection of updates. pub trait Arrange where - G: Scope, - G::Timestamp: Lattice, + G: Scope, { /// Arranges updates into a shared trace. fn arrange(&self) -> Arranged> @@ -358,7 +355,6 @@ where Ba: Batcher + 'static, Bu: Builder, Tr: Trace + 'static, - Tr::Batch: Batch, { self.arrange_named::("Arrange") } @@ -369,14 +365,12 @@ where Ba: Batcher + 'static, Bu: Builder, Tr: Trace + 'static, - Tr::Batch: Batch, ; } impl Arrange> for Collection where - G: Scope, - G::Timestamp: Lattice, + G: Scope, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup, @@ -386,7 +380,6 @@ where Ba: Batcher, Time=G::Timestamp> + 'static, Bu: Builder, Tr: Trace + 'static, - Tr::Batch: Batch, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) @@ -400,14 +393,11 @@ where /// be consistently by key (though this is the most common). pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where - G: Scope, - G::Timestamp: Lattice, + G: Scope, P: ParallelizationContract, - Ba: Batcher + 'static, - Ba::Input: Container + Clone + 'static, + Ba: Batcher + 'static, Bu: Builder, Tr: Trace+'static, - Tr::Batch: Batch, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -555,16 +545,15 @@ where Arranged { stream, trace: reader.unwrap() } } -impl Arrange> for Collection +impl Arrange> for Collection where - G::Timestamp: Lattice+Ord, + G: Scope, { fn arrange_named(&self, name: &str) -> Arranged> where Ba: Batcher, Time=G::Timestamp> + 'static, Bu: Builder, Tr: Trace + 'static, - Tr::Batch: Batch, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) @@ -577,7 +566,9 @@ where /// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the /// pair `(u64, K)` of hash value and key. pub trait ArrangeByKey -where G::Timestamp: Lattice+Ord { +where + G: Scope, +{ /// Arranges a collection of `(Key, Val)` records by `Key`. /// /// This operator arranges a stream of values into a shared trace, whose contents it maintains. @@ -589,9 +580,9 @@ where G::Timestamp: Lattice+Ord { fn arrange_by_key_named(&self, name: &str) -> Arranged>>; } -impl ArrangeByKey for Collection +impl ArrangeByKey for Collection where - G::Timestamp: Lattice+Ord + G: Scope, { fn arrange_by_key(&self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") @@ -607,9 +598,9 @@ where /// This arrangement requires `Key: Hashable`, and uses the `hashed()` method to place keys in a hashed /// map. This can result in many hash calls, and in some cases it may help to first transform `K` to the /// pair `(u64, K)` of hash value and key. -pub trait ArrangeBySelf +pub trait ArrangeBySelf where - G::Timestamp: Lattice+Ord + G: Scope, { /// Arranges a collection of `Key` records by `Key`. /// @@ -623,9 +614,9 @@ where } -impl ArrangeBySelf for Collection +impl ArrangeBySelf for Collection where - G::Timestamp: Lattice+Ord + G: Scope, { fn arrange_by_self(&self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") diff --git a/differential-dataflow/src/operators/arrange/mod.rs b/differential-dataflow/src/operators/arrange/mod.rs index 2ea9319ca3..2b7f4ba9ae 100644 --- a/differential-dataflow/src/operators/arrange/mod.rs +++ b/differential-dataflow/src/operators/arrange/mod.rs @@ -48,10 +48,7 @@ use timely::progress::Antichain; use crate::trace::TraceReader; /// Operating instructions on how to replay a trace. -pub enum TraceReplayInstruction -where - Tr: TraceReader, -{ +pub enum TraceReplayInstruction { /// Describes a frontier advance. Frontier(Antichain), /// Describes a batch of data and a capability hint. diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 5a48e2ebdd..725321edf0 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -110,7 +110,7 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; -use crate::trace::{self, Trace, TraceReader, Batch, Cursor}; +use crate::trace::{self, Trace, TraceReader, Cursor}; use crate::{ExchangeData, Hashable, IntoOwned}; use super::TraceAgent; @@ -131,13 +131,14 @@ pub fn arrange_from_upsert( ) -> Arranged> where G: Scope, - Tr: Trace+TraceReader+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + Tr: Trace + for<'a> TraceReader< + Key<'a> : IntoOwned<'a, Owned = K>, + Val<'a> : IntoOwned<'a, Owned = V>, + Time: TotalOrder+ExchangeData, + Diff=isize, + >+'static, K: ExchangeData+Hashable+std::hash::Hash, V: ExchangeData, - for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, - Tr::Time: TotalOrder+ExchangeData, - Tr::Batch: Batch, Bu: Builder, Output = Tr::Batch>, { let mut reader: Option> = None; diff --git a/differential-dataflow/src/operators/arrange/writer.rs b/differential-dataflow/src/operators/arrange/writer.rs index 5a2f720039..8df11690c2 100644 --- a/differential-dataflow/src/operators/arrange/writer.rs +++ b/differential-dataflow/src/operators/arrange/writer.rs @@ -19,11 +19,7 @@ use super::TraceReplayInstruction; /// /// A `TraceWriter` accepts a sequence of batches and distributes them /// to both a shared trace and to a sequence of private queues. -pub struct TraceWriter -where - Tr: Trace, - Tr::Batch: Batch, -{ +pub struct TraceWriter { /// Current upper limit. upper: Antichain, /// Shared trace, possibly absent (due to weakness). @@ -32,11 +28,7 @@ where queues: Rc>>>, } -impl TraceWriter -where - Tr: Trace, - Tr::Batch: Batch, -{ +impl TraceWriter { /// Creates a new `TraceWriter`. pub fn new( upper: Vec, @@ -98,11 +90,7 @@ where } } -impl Drop for TraceWriter -where - Tr: Trace, - Tr::Batch: Batch, -{ +impl Drop for TraceWriter { fn drop(&mut self) { self.seal(Antichain::new()) } diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 602f58f872..c70498b717 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -19,8 +19,7 @@ use crate::trace::{Batcher, Builder}; /// Methods which require data be arrangeable. impl Collection where - G: Scope, - G::Timestamp: Data+Lattice, + G: Scope, D: ExchangeData+Hashable, R: Semigroup+ExchangeData, { @@ -53,9 +52,7 @@ where pub fn consolidate_named(&self, name: &str) -> Self where Ba: Batcher, Time=G::Timestamp> + 'static, - Tr: crate::trace::Trace+'static, - for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, - Tr::Batch: crate::trace::Batch, + Tr: for<'a> crate::trace::Trace: IntoOwned<'a, Owned = D>,Time=G::Timestamp,Diff=R>+'static, Bu: Builder, { use crate::operators::arrange::arrangement::Arrange; diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 15d06b42b5..53098979c8 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -14,7 +14,7 @@ use crate::operators::arrange::{Arranged, ArrangeBySelf}; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `count` differential dataflow method. -pub trait CountTotal where G::Timestamp: TotalOrder+Lattice+Ord { +pub trait CountTotal, K: ExchangeData, R: Semigroup> { /// Counts the number of occurrences of each element. /// /// # Examples @@ -42,8 +42,10 @@ pub trait CountTotal where G::Timestamp fn count_total_core + 'static>(&self) -> Collection; } -impl CountTotal for Collection -where G::Timestamp: TotalOrder+Lattice+Ord { +impl CountTotal for Collection +where + G: Scope, +{ fn count_total_core + 'static>(&self) -> Collection { self.arrange_by_self_named("Arrange: CountTotal") .count_total_core() @@ -53,12 +55,13 @@ where G::Timestamp: TotalOrder+Lattice+Ord { impl CountTotal for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a ()>+Clone+'static, - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, - for<'a> T1::Diff : Semigroup>, + T1: for<'a> TraceReader< + Key<'a>: IntoOwned<'a, Owned = K>, + Val<'a>=&'a (), + Time: TotalOrder, + Diff: ExchangeData+Semigroup> + >+Clone+'static, K: ExchangeData, - T1::Time: TotalOrder, - T1::Diff: ExchangeData, { fn count_total_core + 'static>(&self) -> Collection { diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 6056ad8cdb..85bec781b2 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -47,7 +47,7 @@ use crate::difference::{Semigroup, Abelian}; use crate::lattice::Lattice; /// An extension trait for the `iterate` method. -pub trait Iterate { +pub trait Iterate, D: Data, R: Semigroup> { /// Iteratively apply `logic` to the source collection until convergence. /// /// Importantly, this method does not automatically consolidate results. @@ -72,16 +72,15 @@ pub trait Iterate { /// }); /// ``` fn iterate(&self, logic: F) -> Collection - where - G::Timestamp: Lattice, - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>; + where + for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>; } -impl Iterate for Collection { +impl, D: Ord+Data+Debug, R: Abelian+'static> Iterate for Collection { fn iterate(&self, logic: F) -> Collection - where G::Timestamp: Lattice, - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R> { - + where + for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>, + { self.inner.scope().scoped("Iterate", |subgraph| { // create a new variable, apply logic, bind variable, return. // @@ -97,11 +96,11 @@ impl Iterate for Colle } } -impl Iterate for G { +impl, D: Ord+Data+Debug, R: Semigroup+'static> Iterate for G { fn iterate(&self, logic: F) -> Collection - where G::Timestamp: Lattice, - for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R> { - + where + for<'a> F: FnOnce(&Collection, D, R>)->Collection, D, R>, + { // TODO: This makes me think we have the wrong ownership pattern here. let mut clone = self.clone(); clone @@ -154,8 +153,7 @@ impl Iterate for G { /// ``` pub struct Variable::Timestamp, R)>> where - G: Scope, - G::Timestamp: Lattice, + G: Scope, D: Data, R: Abelian + 'static, C: Container + Clone + 'static, @@ -166,9 +164,9 @@ where step: ::Summary, } -impl Variable +impl Variable where - G::Timestamp: Lattice, + G: Scope, StreamCore: crate::operators::Negate + ResultsIn, { /// Creates a new initially empty `Variable`. @@ -220,7 +218,7 @@ where } } -impl Deref for Variable where G::Timestamp: Lattice { +impl, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable { type Target = Collection; fn deref(&self) -> &Self::Target { &self.collection @@ -235,8 +233,7 @@ impl Deref for Va /// negation. pub struct SemigroupVariable::Timestamp, R)>> where - G::Timestamp: Lattice, - G: Scope, + G: Scope, D: Data, R: Semigroup + 'static, C: Container + Clone + 'static, @@ -246,9 +243,9 @@ where step: ::Summary, } -impl SemigroupVariable +impl SemigroupVariable where - G::Timestamp: Lattice, + G: Scope, StreamCore: ResultsIn, { /// Creates a new initially empty `SemigroupVariable`. diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 68da44d5a8..25be349c63 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -55,8 +55,7 @@ pub trait Join { K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, - R: Multiply, - >::Output: Semigroup+'static + R: Multiply, { self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone()))) } @@ -80,7 +79,7 @@ pub trait Join { /// }); /// ``` fn join_map(&self, other: &Collection, logic: L) -> Collection>::Output> - where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, >::Output: Semigroup+'static, D: Data, L: FnMut(&K, &V, &V2)->D+'static; + where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, D: Data, L: FnMut(&K, &V, &V2)->D+'static; /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied. /// @@ -105,7 +104,7 @@ pub trait Join { /// }); /// ``` fn semijoin(&self, other: &Collection) -> Collection>::Output> - where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply, >::Output: Semigroup+'static; + where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply; /// Subtracts the semijoin with `other` from `self`. /// @@ -139,21 +138,20 @@ pub trait Join { impl Join for Collection where - G: Scope, + G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, - G::Timestamp: Lattice+Ord, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> - where R: Multiply, >::Output: Semigroup+'static, L: FnMut(&K, &V, &V2)->D+'static { + where R: Multiply, L: FnMut(&K, &V, &V2)->D+'static { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_key(); arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } fn semijoin(&self, other: &Collection) -> Collection>::Output> - where R: Multiply, >::Output: Semigroup+'static { + where R: Multiply { let arranged1 = self.arrange_by_key(); let arranged2 = other.arrange_by_self(); arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) @@ -174,8 +172,7 @@ where { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> where - Tr::Diff: Multiply, - >::Output: Semigroup+'static, + Tr::Diff: Multiply, L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, { let arranged2 = other.arrange_by_key(); @@ -183,7 +180,7 @@ where } fn semijoin(&self, other: &Collection) -> Collection>::Output> - where Tr::Diff: Multiply, >::Output: Semigroup+'static { + where Tr::Diff: Multiply { let arranged2 = other.arrange_by_self(); self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } @@ -200,7 +197,7 @@ where /// This method is used by the various `join` implementations, but it can also be used /// directly in the event that one has a handle to an `Arranged`, perhaps because /// the arrangement is available for re-use, or from the output of a `reduce` operator. -pub trait JoinCore where G::Timestamp: Lattice+Ord { +pub trait JoinCore, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> { /// Joins two arranged collections with the same key type. /// @@ -235,10 +232,8 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - R: Multiply, - >::Output: Semigroup+'static, - I: IntoIterator, - I::Item: Data, + R: Multiply, + I: IntoIterator, L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, ; @@ -288,19 +283,16 @@ pub trait JoinCore JoinCore for Collection where - G: Scope, + G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, - G::Timestamp: Lattice+Ord, { fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, - R: Multiply, - >::Output: Semigroup+'static, - I: IntoIterator, - I::Item: Data, + R: Multiply, + I: IntoIterator, L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, { self.arrange_by_key() diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 6a22ab2eab..2267268ff4 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -21,14 +21,14 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use crate::lattice::Lattice; -use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; +use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder}; use crate::trace::TraceReader; /// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce where G::Timestamp: Lattice+Ord { +pub trait Reduce, K: Data, V: Data, R: Semigroup> { /// Applies a reduction function on records grouped by key. /// /// Input data must be structured as `(key, val)` pairs. @@ -69,8 +69,7 @@ pub trait Reduce where G::Timestamp: L impl Reduce for Collection where - G: Scope, - G::Timestamp: Lattice+Ord, + G: Scope, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup, @@ -86,8 +85,6 @@ impl Reduce for A where G: Scope, T1: for<'a> TraceReader=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static, - for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>, - for<'a> T1::Val<'a> : IntoOwned<'a, Owned = V>, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -97,7 +94,7 @@ where } /// Extension trait for the `threshold` and `distinct` differential dataflow methods. -pub trait Threshold where G::Timestamp: Lattice+Ord { +pub trait Threshold, K: Data, R1: Semigroup> { /// Transforms the multiplicity of records. /// /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at @@ -153,8 +150,7 @@ pub trait Threshold where G::Timestamp: Lattic } } -impl Threshold for Collection -where G::Timestamp: Lattice+Ord { +impl, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold for Collection { fn threshold_namedR2+'static>(&self, name: &str, thresh: F) -> Collection { self.arrange_by_self_named(&format!("Arrange: {}", name)) .threshold_named(name, thresh) @@ -165,7 +161,6 @@ impl Threshold for Arranged where G: Scope, T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R1>+Clone+'static, - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,K,(),KeyBuilder,KeySpine>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -174,7 +169,7 @@ where } /// Extension trait for the `count` differential dataflow method. -pub trait Count where G::Timestamp: Lattice+Ord { +pub trait Count, K: Data, R: Semigroup> { /// Counts the number of occurrences of each element. /// /// # Examples @@ -202,10 +197,7 @@ pub trait Count where G::Timestamp: Lattice+Ord fn count_core + 'static>(&self) -> Collection; } -impl Count for Collection -where - G::Timestamp: Lattice+Ord, -{ +impl, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count for Collection { fn count_core + 'static>(&self) -> Collection { self.arrange_by_self_named("Arrange: Count") .count_core() @@ -216,7 +208,6 @@ impl Count for Arranged where G: Scope, T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Diff=R>+Clone+'static, - for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>, { fn count_core + 'static>(&self) -> Collection { self.reduce_abelian::<_,K,R,ValBuilder,ValSpine>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -225,7 +216,7 @@ where } /// Extension trait for the `reduce_core` differential dataflow method. -pub trait ReduceCore where G::Timestamp: Lattice+Ord { +pub trait ReduceCore, K: ToOwned + ?Sized, V: Data, R: Semigroup> { /// Applies `reduce` to arranged data, and returns an arrangement of output data. /// /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although @@ -253,10 +244,12 @@ pub trait ReduceCore where /// ``` fn reduce_abelian(&self, name: &str, mut logic: L) -> Arranged> where - T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Diff: Abelian, - T2::Batch: Batch, + T2: for<'a> Trace< + Key<'a>= &'a K, + Val<'a> : IntoOwned<'a, Owned = V>, + Time=G::Timestamp, + Diff: Abelian, + >+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { @@ -276,9 +269,11 @@ pub trait ReduceCore where /// At least one of the two collections will be non-empty. fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Batch: Batch, + T2: for<'a> Trace< + Key<'a>=&'a K, + Val<'a> : IntoOwned<'a, Owned = V>, + Time=G::Timestamp, + >+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; @@ -295,9 +290,11 @@ where fn reduce_core(&self, name: &str, logic: L) -> Arranged> where V: Data, - T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Batch: Batch, + T2: for<'a> Trace< + Key<'a>=&'a K, + Val<'a> : IntoOwned<'a, Owned = V>, + Time=G::Timestamp, + >+'static, Bu: Builder, Output = T2::Batch>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { @@ -312,15 +309,11 @@ where pub fn reduce_trace(trace: &Arranged, name: &str, mut logic: L) -> Arranged> where G: Scope, - T1: TraceReader + Clone + 'static, - for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>, - T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, + T1: for<'a> TraceReader : IntoOwned<'a, Owned = K>> + Clone + 'static, + T2: for<'a> Trace=T1::Key<'a>, Val<'a> : IntoOwned<'a, Owned = V>, Time=T1::Time> + 'static, K: Ord + 'static, V: Data, - for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>, - T2::Batch: Batch, - Bu: Builder, - Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -633,10 +626,9 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, C1, C2, C3, V> where C1: Cursor, - C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, - for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>, { fn new() -> Self; fn compute( @@ -698,10 +690,9 @@ mod history_replay { impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V> where C1: Cursor, - C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C2: for<'b> Cursor = C1::Key<'a>, Val<'b> : IntoOwned<'b, Owned = V>, Time = C1::Time>, C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, V: Clone + Ord, - for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>, { fn new() -> Self { HistoryReplayer { diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 187a94704a..6476b19b98 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -17,7 +17,7 @@ use crate::operators::arrange::{Arranged, ArrangeBySelf}; use crate::trace::{BatchReader, Cursor, TraceReader}; /// Extension trait for the `distinct` differential dataflow method. -pub trait ThresholdTotal where G::Timestamp: TotalOrder+Lattice+Ord { +pub trait ThresholdTotal, K: ExchangeData, R: ExchangeData+Semigroup> { /// Reduces the collection to one occurrence of each distinct element. fn threshold_semigroup(&self, thresh: F) -> Collection where @@ -85,7 +85,9 @@ pub trait ThresholdTotal w } impl ThresholdTotal for Collection -where G::Timestamp: TotalOrder+Lattice+Ord { +where + G: Scope, +{ fn threshold_semigroup(&self, thresh: F) -> Collection where R2: Semigroup+'static, @@ -99,11 +101,13 @@ where G::Timestamp: TotalOrder+Lattice+Ord { impl ThresholdTotal for Arranged where G: Scope, - T1: for<'a> TraceReader=&'a K, Val<'a>=&'a ()>+Clone+'static, - for<'a> T1::Diff : Semigroup>, + T1: for<'a> TraceReader< + Key<'a>=&'a K, + Val<'a>=&'a (), + Time: TotalOrder, + Diff : ExchangeData + Semigroup>, + >+Clone+'static, K: ExchangeData, - T1::Time: TotalOrder, - T1::Diff: ExchangeData, { fn threshold_semigroup(&self, mut thresh: F) -> Collection where diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index 0eef4d7797..c47bd95cd1 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -242,10 +242,7 @@ pub struct ContainerChunker { empty: Output, } -impl Default for ContainerChunker -where - Output: Default, -{ +impl Default for ContainerChunker { fn default() -> Self { Self { pending: Output::default(), diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs index cd8bbbb099..c5d08e6bb8 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/differential-dataflow/src/trace/implementations/huffman_container.rs @@ -21,10 +21,7 @@ pub struct HuffmanContainer { stats: BTreeMap } -impl HuffmanContainer -where - B: Ord + Clone, -{ +impl HuffmanContainer { /// Prints statistics about encoded containers. pub fn print(&self) { if let Ok((_huff, bytes)) = &self.inner { @@ -439,10 +436,7 @@ mod huffman { } } - impl<'a, T, I> Iterator for Decoder<'a, T, I> - where - I: Iterator, - { + impl<'a, T, I: Iterator> Iterator for Decoder<'a, T, I> { type Item = &'a T; fn next(&mut self) -> Option<&'a T> { // We must navigate `self.decode`, restocking bits whenever possible. diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 4de9435941..47c26e3c7f 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -50,8 +50,7 @@ pub struct MergeBatcher { impl Batcher for MergeBatcher where C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, - M: Merger, - M::Time: Timestamp, + M: Merger, { type Input = Input; type Time = M::Time; @@ -127,10 +126,7 @@ where } } -impl MergeBatcher -where - M: Merger, -{ +impl MergeBatcher { /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered /// by decreasing length. fn insert_chain(&mut self, chain: Vec) { @@ -194,10 +190,7 @@ where } } -impl Drop for MergeBatcher -where - M: Merger, -{ +impl Drop for MergeBatcher { fn drop(&mut self) { // Cleanup chain to retract accounting information. while self.chain_pop().is_some() {} @@ -330,8 +323,7 @@ pub mod container { impl Merger for ContainerMerger where - for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, - for<'a> MC::TimeOwned: Ord + PartialOrder + Data, + for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, CQ: ContainerQueue, { type Time = MC::TimeOwned; diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 717c9df682..1c9ec7da60 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -114,10 +114,7 @@ pub struct Vector { phantom: std::marker::PhantomData, } -impl Layout for Vector -where - U::Diff: Ord, -{ +impl> Layout for Vector { type Target = U; type KeyContainer = Vec; type ValContainer = Vec; @@ -131,12 +128,14 @@ pub struct TStack { phantom: std::marker::PhantomData, } -impl Layout for TStack +impl Layout for TStack where - U::Key: Columnation, - U::Val: Columnation, - U::Time: Columnation, - U::Diff: Columnation + Ord, + U: Update< + Key: Columnation, + Val: Columnation, + Time: Columnation, + Diff: Columnation + Ord, + >, { type Target = U; type KeyContainer = TimelyStack; @@ -169,10 +168,8 @@ pub struct Preferred { impl Update for Preferred where - K: ToOwned + ?Sized, - K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized, - V::Owned: Ord+Clone+'static, + K: ToOwned + ?Sized, + V: ToOwned + ?Sized, T: Ord+Clone+Lattice+timely::progress::Timestamp, R: Ord+Clone+Semigroup+'static, { @@ -184,10 +181,8 @@ where impl Layout for Preferred where - K: Ord+ToOwned+PreferredContainer + ?Sized, - K::Owned: Ord+Clone+'static, - V: Ord+ToOwned+PreferredContainer + ?Sized, - V::Owned: Ord+Clone+'static, + K: Ord+ToOwned+PreferredContainer + ?Sized, + V: Ord+ToOwned+PreferredContainer + ?Sized, T: Ord+Clone+Lattice+timely::progress::Timestamp, D: Ord+Clone+Semigroup+'static, { @@ -345,11 +340,9 @@ pub trait BuilderInput: Container { impl BuilderInput for Vec<((K, V), T, R)> where K: Ord + Clone + 'static, - KBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>, + KBC: for<'a> BatchContainer: PartialEq<&'a K>>, V: Ord + Clone + 'static, - VBC: BatchContainer, - for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>, + VBC: for<'a> BatchContainer: PartialEq<&'a V>>, T: Timestamp + Lattice + Clone + 'static, R: Ord + Semigroup + 'static, { @@ -398,12 +391,14 @@ where impl BuilderInput for TimelyStack<((K::Owned, V::Owned), T, R)> where - K: BatchContainer, - for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>, - K::Owned: Ord + Columnation + Clone + 'static, - V: BatchContainer, - for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>, - V::Owned: Ord + Columnation + Clone + 'static, + K: for<'a> BatchContainer< + ReadItem<'a>: PartialEq<&'a K::Owned>, + Owned: Ord + Columnation + Clone + 'static, + >, + V: for<'a> BatchContainer< + ReadItem<'a>: PartialEq<&'a V::Owned>, + Owned: Ord + Columnation + Clone + 'static, + >, T: Timestamp + Lattice + Columnation + Clone + 'static, R: Ord + Clone + Semigroup + Columnation + 'static, { diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 5035a229c4..c987ddc6cf 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -593,10 +593,11 @@ pub mod val_batch { impl Builder for OrdValBuilder where - L: Layout, + L: for<'a> Layout< + KeyContainer: PushInto>, + ValContainer: PushInto>, + >, CI: for<'a> BuilderInput::Time, Diff=::Diff>, - for<'a> L::KeyContainer: PushInto>, - for<'a> L::ValContainer: PushInto>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, { @@ -1095,11 +1096,10 @@ mod key_batch { impl Builder for OrdKeyBuilder where - L: Layout, - CI: for<'a> BuilderInput::Time, Diff=::Diff>, - for<'a> L::KeyContainer: PushInto>, + L: for<'a> Layout>>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, + CI: BuilderInput::Time, Diff=::Diff>, { type Input = CI; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 48ce98899f..911697502c 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -53,8 +53,7 @@ pub struct HashWrapper { pub inner: T } -impl PartialOrd for HashWrapper -where ::Output: PartialOrd { +impl> PartialOrd for HashWrapper { fn partial_cmp(&self, other: &Self) -> Option { let this_hash = self.inner.hashed(); let that_hash = other.inner.hashed(); @@ -62,8 +61,7 @@ where ::Output: PartialOrd { } } -impl Ord for HashWrapper -where ::Output: PartialOrd { +impl> Ord for HashWrapper { fn cmp(&self, other: &Self) -> Ordering { self.partial_cmp(other).unwrap() } diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index 6b1c02d5c2..ea83f1f7c0 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -99,10 +99,7 @@ pub struct Spine { exert_logic: Option, } -impl TraceReader for Spine -where - B: Batch+Clone+'static, -{ +impl TraceReader for Spine { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; @@ -245,10 +242,7 @@ where // A trace implementation for any key type that can be borrowed from or converted into `Key`. // TODO: Almost all this implementation seems to be generic with respect to the trace and batch types. -impl Trace for Spine -where - B: Batch+Clone+'static, -{ +impl Trace for Spine { fn new( info: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option, @@ -765,7 +759,7 @@ enum MergeState { Double(MergeVariant), } -impl MergeState where B::Time: Eq { +impl> MergeState { /// The number of actual updates contained in the level. fn len(&self) -> usize { diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 198f99bb62..86e6c8503a 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -179,8 +179,7 @@ pub trait TraceReader { /// /// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need /// to return them. -pub trait Trace : TraceReader -where ::Batch: Batch { +pub trait Trace : TraceReader { /// Allocates a new empty trace. fn new( @@ -222,10 +221,7 @@ where ::Batch: Batch { /// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is /// especially useful for views derived from other sources in ways that prevent the construction of batches /// from the type of data in the view (for example, filtered views, or views with extended time coordinates). -pub trait BatchReader -where - Self: ::std::marker::Sized, -{ +pub trait BatchReader : Sized { /// Key by which updates are indexed. type Key<'a>: Copy + Clone + Ord; /// Values associated with keys. @@ -257,7 +253,7 @@ where } /// An immutable collection of updates. -pub trait Batch : BatchReader where Self: ::std::marker::Sized { +pub trait Batch : BatchReader + Sized { /// A type used to progressively merge batches. type Merger: Merger; diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 418b3e5735..6c80f380dc 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -27,8 +27,7 @@ impl Clone for TraceEnter { impl TraceReader for TraceEnter where - Tr: TraceReader, - Tr::Batch: Clone, + Tr: TraceReader, TInner: Refines+Lattice, { type Key<'a> = Tr::Key<'a>; diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 2f27fb70e9..b085d80be1 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -42,8 +42,7 @@ where impl TraceReader for TraceEnter where - Tr: TraceReader, - Tr::Batch: Clone, + Tr: TraceReader, TInner: Refines+Lattice, F: 'static, F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone, diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index 1fe95b7ffc..a2877402d0 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -26,8 +26,7 @@ where impl TraceReader for TraceFilter where - Tr: TraceReader, - Tr::Batch: Clone, + Tr: TraceReader, F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static, { type Key<'a> = Tr::Key<'a>; @@ -58,10 +57,7 @@ where } } -impl TraceFilter -where - Tr: TraceReader, -{ +impl TraceFilter { /// Makes a new trace wrapper pub fn make_from(trace: Tr, logic: F) -> Self { TraceFilter { @@ -100,10 +96,7 @@ where fn description(&self) -> &Description { self.batch.description() } } -impl BatchFilter -where - B: BatchReader, -{ +impl BatchFilter { /// Makes a new batch wrapper pub fn make_from(batch: B, logic: F) -> Self { BatchFilter { diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 0bc67b626a..6be7d0663a 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -72,8 +72,7 @@ where impl TraceReader for TraceFreeze where - Tr: TraceReader, - Tr::Batch: Clone, + Tr: TraceReader, F: Fn(Tr::TimeGat<'_>)->Option+'static, { type Key<'a> = Tr::Key<'a>; @@ -109,8 +108,7 @@ where impl TraceFreeze where - Tr: TraceReader, - Tr::Batch: Clone, + Tr: TraceReader, F: Fn(Tr::TimeGat<'_>)->Option, { /// Makes a new trace wrapper diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 2330cc043f..85f2b95ff2 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -117,7 +117,7 @@ pub struct CursorFrontier { until: Antichain } -impl CursorFrontier where T: Clone { +impl CursorFrontier { fn new(cursor: C, since: AntichainRef, until: AntichainRef) -> Self { CursorFrontier { cursor, @@ -190,10 +190,7 @@ impl BatchCursorFrontier { } } -impl Cursor for BatchCursorFrontier -where - C::Storage: BatchReader, -{ +impl> Cursor for BatchCursorFrontier { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; diff --git a/differential-dataflow/tests/bfs.rs b/differential-dataflow/tests/bfs.rs index 70d47509d2..509564a352 100644 --- a/differential-dataflow/tests/bfs.rs +++ b/differential-dataflow/tests/bfs.rs @@ -202,9 +202,10 @@ fn bfs_differential( } // returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where + G: Scope, +{ // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/differential-dataflow/tests/import.rs b/differential-dataflow/tests/import.rs index 5fd931837f..4d0582c95a 100644 --- a/differential-dataflow/tests/import.rs +++ b/differential-dataflow/tests/import.rs @@ -12,7 +12,8 @@ use itertools::Itertools; type Result = std::sync::mpsc::Receiver>>; fn run_test(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> () - where T: FnOnce(Vec>)-> Result + ::std::panic::UnwindSafe +where + T: FnOnce(Vec>)-> Result + ::std::panic::UnwindSafe, { let input_epochs: Vec> = vec![ vec![((2, 0), 1), ((1, 0), 1), ((1, 3), 1), ((4, 2), 1)], diff --git a/differential-dataflow/tests/scc.rs b/differential-dataflow/tests/scc.rs index de5002026f..5aeba6a72d 100644 --- a/differential-dataflow/tests/scc.rs +++ b/differential-dataflow/tests/scc.rs @@ -215,8 +215,10 @@ fn scc_differential( .collect() } -fn _strongly_connected(graph: &Collection) -> Collection -where G::Timestamp: Lattice+Ord+Hash { +fn _strongly_connected(graph: &Collection) -> Collection +where + G: Scope, +{ graph.iterate(|inner| { let edges = graph.enter(&inner.scope()); let trans = edges.map_in_place(|x| mem::swap(&mut x.0, &mut x.1)); @@ -224,9 +226,10 @@ where G::Timestamp: Lattice+Ord+Hash { }) } -fn _trim_edges(cycle: &Collection, edges: &Collection) - -> Collection where G::Timestamp: Lattice+Ord+Hash { - +fn _trim_edges(cycle: &Collection, edges: &Collection) -> Collection +where + G: Scope, +{ let nodes = edges.map_in_place(|x| x.0 = x.1) .consolidate(); @@ -240,9 +243,10 @@ fn _trim_edges(cycle: &Collection, edges: &Collection(edges: &Collection, nodes: &Collection) -> Collection -where G::Timestamp: Lattice+Ord+Hash { - +fn _reachability(edges: &Collection, nodes: &Collection) -> Collection +where + G: Scope, +{ edges.filter(|_| false) .iterate(|inner| { let edges = edges.enter(&inner.scope()); diff --git a/dogsdogsdogs/examples/ngo.rs b/dogsdogsdogs/examples/ngo.rs index 76d448fed9..a87ff090f9 100644 --- a/dogsdogsdogs/examples/ngo.rs +++ b/dogsdogsdogs/examples/ngo.rs @@ -42,8 +42,9 @@ fn main() { } fn triangles(edges: &Collection) -> Collection -where G::Timestamp: Lattice+Hash+Ord { - +where + G: Scope, +{ // only use forward-pointing edges. let edges = edges.filter(|&(src, dst)| src < dst); diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 3e73994690..c0b93b0990 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -181,11 +181,10 @@ where impl PrefixExtender for CollectionExtender where - G: Scope, + G: Scope, K: ExchangeData+Hash+Default, V: ExchangeData+Hash+Default, P: ExchangeData, - G::Timestamp: Lattice+ExchangeData, R: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, { diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 6485ba9098..102c65cfad 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -20,8 +20,10 @@ pub fn count( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, + Tr: for<'a> TraceReader< + Key<'a>: IntoOwned<'a, Owned = K>, + Diff=isize, + >+Clone+'static, for<'a> Tr::Diff : Semigroup>, K: Hashable + Ord + Default + 'static, R: Monoid+Multiply+ExchangeData, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 56d0fcf07d..43d81b00c3 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -81,10 +81,8 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, - R: Mul, - >::Output: Semigroup, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>+Clone+'static, + R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, DOut: Clone+'static, @@ -135,8 +133,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, DOut: Clone+'static, diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 0440e2bf88..54fc11bb26 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -28,11 +28,11 @@ pub fn lookup_map( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = K>, - for<'a> Tr::Diff : Semigroup>, + Tr: for<'a> TraceReader< + Key<'a>: IntoOwned<'a, Owned = K>, + Diff : Semigroup>+Monoid+ExchangeData, + >+Clone+'static, K: Hashable + Ord + 'static, - Tr::Diff: Monoid+ExchangeData, F: FnMut(&D, &mut K)+Clone+'static, D: ExchangeData, R: ExchangeData+Monoid, diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 25df5d008e..571cf5071e 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -21,15 +21,15 @@ pub fn propose( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, + Tr: for<'a> TraceReader< + Key<'a> : IntoOwned<'a, Owned = K>, + Val<'a> : IntoOwned<'a, Owned = V>, + Diff: Monoid+Multiply+ExchangeData+Semigroup>, + >+Clone+'static, K: Hashable + Default + Ord + 'static, - Tr::Diff: Monoid+Multiply+ExchangeData, - for<'a> Tr::Diff : Semigroup>, F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, - for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, { crate::operators::lookup_map( prefixes, @@ -54,15 +54,15 @@ pub fn propose_distinct( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = K>, - for<'a> Tr::Diff : Semigroup>, + Tr: for<'a> TraceReader< + Key<'a> : IntoOwned<'a, Owned = K>, + Val<'a> : IntoOwned<'a, Owned = V>, + Diff : Semigroup>+Monoid+Multiply+ExchangeData, + >+Clone+'static, K: Hashable + Default + Ord + 'static, - Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, P: ExchangeData, V: Clone + 'static, - for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>, { crate::operators::lookup_map( prefixes, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index b6dfc404b0..51fe7df3a0 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -20,12 +20,12 @@ pub fn validate( ) -> Collection where G: Scope, - Tr: TraceReader+Clone+'static, - for<'a> Tr::Key<'a> : IntoOwned<'a, Owned = (K, V)>, - for<'a> Tr::Diff : Semigroup>, + Tr: for<'a> TraceReader< + Key<'a> : IntoOwned<'a, Owned = (K, V)>, + Diff : Semigroup>+Monoid+Multiply+ExchangeData, + >+Clone+'static, K: Ord+Hash+Clone+Default + 'static, V: ExchangeData+Hash+Default, - Tr::Diff: Monoid+Multiply+ExchangeData, F: Fn(&P)->K+Clone+'static, P: ExchangeData, {