diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index cbe4d62c3..b5af06ef2 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -159,14 +159,14 @@ mod container { type BorrowedOf<'a, C> = <::Container as columnar::Container>::Borrowed<'a>; impl Column { - pub fn borrow(&self) -> BorrowedOf { + pub fn borrow(&self) -> BorrowedOf<'_, C> { match self { Column::Typed(t) => t.borrow(), Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))), Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)), } } - pub fn get(&self, index: usize) -> columnar::Ref { + pub fn get(&self, index: usize) -> columnar::Ref<'_, C> { self.borrow().get(index) } } @@ -493,7 +493,7 @@ pub mod batcher { for<'b> columnar::Ref<'b, T>: Ord, R: Columnar, { - fn next_or_alloc(&mut self) -> Result, Column<(D, T, R)>> { + fn next_or_alloc(&mut self) -> Result, Column<(D, T, R)>> { if self.is_empty() { Err(std::mem::take(&mut self.list)) } @@ -517,12 +517,12 @@ pub mod batcher { } impl ColumnQueue { - fn pop(&mut self) -> columnar::Ref { + fn pop(&mut self) -> columnar::Ref<'_, T> { self.head += 1; self.list.get(self.head - 1) } - fn peek(&self) -> columnar::Ref { + fn peek(&self) -> columnar::Ref<'_, T> { self.list.get(self.head) } } diff --git a/differential-dataflow/src/operators/arrange/agent.rs b/differential-dataflow/src/operators/arrange/agent.rs index 71951f856..b4d82952e 100644 --- a/differential-dataflow/src/operators/arrange/agent.rs +++ b/differential-dataflow/src/operators/arrange/agent.rs @@ -55,10 +55,10 @@ impl TraceReader for TraceAgent { ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain); self.temp_antichain.clear(); } - fn get_logical_compaction(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`. // Instead, it determines the joint consequences of both guarantees and moves forward with that. crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain); @@ -66,10 +66,10 @@ impl TraceReader for TraceAgent { ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain); self.temp_antichain.clear(); } - fn get_physical_compaction(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() } - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) } fn map_batches(&self, f: F) { self.trace.borrow().trace.map_batches(f) } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 47c26e3c7..90a74f662 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -121,7 +121,7 @@ where /// The frontier of elements remaining after the most recent call to `self.seal`. #[inline] - fn frontier(&mut self) -> AntichainRef { + fn frontier(&mut self) -> AntichainRef<'_, M::Time> { self.frontier.borrow() } } diff --git a/differential-dataflow/src/trace/implementations/spine_fueled.rs b/differential-dataflow/src/trace/implementations/spine_fueled.rs index 9cdd27443..b7be06b9c 100644 --- a/differential-dataflow/src/trace/implementations/spine_fueled.rs +++ b/differential-dataflow/src/trace/implementations/spine_fueled.rs @@ -211,9 +211,9 @@ impl TraceReader for Spine { self.logical_frontier.extend(frontier.iter().cloned()); } #[inline] - fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.logical_frontier.borrow() } #[inline] - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, B::Time>) { // We should never request to rewind the frontier. debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); self.physical_frontier.clear(); @@ -221,7 +221,7 @@ impl TraceReader for Spine { self.consider_merges(); } #[inline] - fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_frontier.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.physical_frontier.borrow() } #[inline] fn map_batches(&self, mut f: F) { diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 27598b59a..0197e09c0 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -133,7 +133,7 @@ pub trait TraceReader : LayoutExt { /// not beyond this frontier will present as a time that compares identically with all query times beyond /// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as /// presented, and should be used carefully, only in accumulation to times that are beyond the frontier. - fn get_logical_compaction(&mut self) -> AntichainRef; + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>; /// Advances the frontier that constrains physical compaction. /// @@ -149,7 +149,7 @@ pub trait TraceReader : LayoutExt { /// /// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to /// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called. - fn set_physical_compaction(&mut self, frontier: AntichainRef); + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>); /// Reports the physical compaction frontier. /// @@ -157,7 +157,7 @@ pub trait TraceReader : LayoutExt { /// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the /// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any /// other operators who need to take notice of the physical structure of update batches. - fn get_physical_compaction(&mut self) -> AntichainRef; + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>; /// Maps logic across the non-empty sequence of batches in the trace. /// @@ -314,7 +314,7 @@ pub trait Batcher { /// Returns all updates not greater or equal to an element of `upper`. fn seal>(&mut self, upper: Antichain) -> B::Output; /// Returns the lower envelope of contained update times. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; + fn frontier(&mut self) -> AntichainRef<'_, Self::Time>; } /// Functionality for building batches from ordered update sequences. diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 6886631df..cea7c8b27 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -54,14 +54,14 @@ where }) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert(time.clone().to_outer()); } self.trace.set_logical_compaction(self.stash1.borrow()); } - fn get_logical_compaction(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> { self.stash2.clear(); for time in self.trace.get_logical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); @@ -69,14 +69,14 @@ where self.stash2.borrow() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert(time.clone().to_outer()); } self.trace.set_physical_compaction(self.stash1.borrow()); } - fn get_physical_compaction(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> { self.stash2.clear(); for time in self.trace.get_physical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index 3a789f473..7a4739f08 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -75,14 +75,14 @@ where }) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert((self.prior)(time)); } self.trace.set_logical_compaction(self.stash1.borrow()); } - fn get_logical_compaction(&mut self) -> AntichainRef { + fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> { self.stash2.clear(); for time in self.trace.get_logical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); @@ -90,14 +90,14 @@ where self.stash2.borrow() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) { self.stash1.clear(); for time in frontier.iter() { self.stash1.insert((self.prior)(time)); } self.trace.set_physical_compaction(self.stash1.borrow()); } - fn get_physical_compaction(&mut self) -> AntichainRef { + fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> { self.stash2.clear(); for time in self.trace.get_physical_compaction().iter() { self.stash2.insert(TInner::to_inner(time.clone())); diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index ad49543bd..363ad0461 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -43,13 +43,13 @@ where .map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone()))) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } - fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } - fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y)) } } diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 5af2cdbf3..d27d8ccc3 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -99,13 +99,13 @@ where }) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } - fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } - fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { let func = &self.func; self.trace.cursor_through(upper) .map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage)) diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index 22cca80fe..5d521831b 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -53,13 +53,13 @@ impl TraceReader for TraceFrontier { self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until))) } - fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_logical_compaction(frontier) } - fn get_logical_compaction(&mut self) -> AntichainRef { self.trace.get_logical_compaction() } + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() } - fn set_physical_compaction(&mut self, frontier: AntichainRef) { self.trace.set_physical_compaction(frontier) } - fn get_physical_compaction(&mut self) -> AntichainRef { self.trace.get_physical_compaction() } + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() } - fn cursor_through(&mut self, upper: AntichainRef) -> Option<(Self::Cursor, Self::Storage)> { + fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> { let since = self.since.borrow(); let until = self.until.borrow(); self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y)) @@ -68,7 +68,7 @@ impl TraceReader for TraceFrontier { impl TraceFrontier { /// Makes a new trace wrapper - pub fn make_from(trace: Tr, since: AntichainRef, until: AntichainRef) -> Self { + pub fn make_from(trace: Tr, since: AntichainRef<'_, Tr::Time>, until: AntichainRef<'_, Tr::Time>) -> Self { TraceFrontier { trace, since: since.to_owned(), diff --git a/differential-dataflow/src/trace/wrappers/rc.rs b/differential-dataflow/src/trace/wrappers/rc.rs index 9051db631..76a44c129 100644 --- a/differential-dataflow/src/trace/wrappers/rc.rs +++ b/differential-dataflow/src/trace/wrappers/rc.rs @@ -94,19 +94,19 @@ impl TraceReader for TraceRc { /// This change may not have immediately observable effects. It informs the shared trace that this /// handle no longer requires access to times other than those in the future of `frontier`, but if /// there are other handles to the same trace, it may not yet be able to compact. - fn set_logical_compaction(&mut self, frontier: AntichainRef) { + fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier); self.logical_compaction = frontier.to_owned(); } - fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_compaction.borrow() } + fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() } /// Allows the trace to compact batches of times before `frontier`. - fn set_physical_compaction(&mut self, frontier: AntichainRef) { + fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier); self.physical_compaction = frontier.to_owned(); } - fn get_physical_compaction(&mut self) -> AntichainRef { self.physical_compaction.borrow() } + fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() } /// Creates a new cursor over the wrapped trace. - fn cursor_through(&mut self, frontier: AntichainRef) -> Option<(Tr::Cursor, Tr::Storage)> { + fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> { ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 2dbe570d9..745de0bb9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -80,7 +80,7 @@ impl TraceHandler { boxed.downcast_mut::().ok_or(format!("failed to downcast: {}", name)) } /// Enumerates the keys maintained in storage (for the `list` operation). - pub fn keys(&self) -> ::std::collections::hash_map::Keys> { + pub fn keys(&self) -> ::std::collections::hash_map::Keys<'_, String, Box> { self.handles.keys() } /// Assign a thing to key `name`, boxed as `Box`.