Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ mod container {
type BorrowedOf<'a, C> = <<C as Columnar>::Container as columnar::Container>::Borrowed<'a>;

impl<C: Columnar> Column<C> {
pub fn borrow(&self) -> BorrowedOf<C> {
pub fn borrow(&self) -> BorrowedOf<'_, C> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}
pub fn get(&self, index: usize) -> columnar::Ref<C> {
pub fn get(&self, index: usize) -> columnar::Ref<'_, C> {
self.borrow().get(index)
}
}
Expand Down Expand Up @@ -493,7 +493,7 @@ pub mod batcher {
for<'b> columnar::Ref<'b, T>: Ord,
R: Columnar,
{
fn next_or_alloc(&mut self) -> Result<columnar::Ref<(D, T, R)>, Column<(D, T, R)>> {
fn next_or_alloc(&mut self) -> Result<columnar::Ref<'_, (D, T, R)>, Column<(D, T, R)>> {
if self.is_empty() {
Err(std::mem::take(&mut self.list))
}
Expand All @@ -517,12 +517,12 @@ pub mod batcher {
}

impl<T: Columnar> ColumnQueue<T> {
fn pop(&mut self) -> columnar::Ref<T> {
fn pop(&mut self) -> columnar::Ref<'_, T> {
self.head += 1;
self.list.get(self.head - 1)
}

fn peek(&self) -> columnar::Ref<T> {
fn peek(&self) -> columnar::Ref<'_, T> {
self.list.get(self.head)
}
}
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
self.logical_compaction.borrow()
}
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
// This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
// Instead, it determines the joint consequences of both guarantees and moves forward with that.
crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
self.temp_antichain.clear();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where

/// The frontier of elements remaining after the most recent call to `self.seal`.
#[inline]
fn frontier(&mut self) -> AntichainRef<M::Time> {
fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
self.frontier.borrow()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,17 @@ impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
self.logical_frontier.extend(frontier.iter().cloned());
}
#[inline]
fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
fn get_logical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.logical_frontier.borrow() }
#[inline]
fn set_physical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, B::Time>) {
// We should never request to rewind the frontier.
debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
self.physical_frontier.clear();
self.physical_frontier.extend(frontier.iter().cloned());
self.consider_merges();
}
#[inline]
fn get_physical_compaction(&mut self) -> AntichainRef<B::Time> { self.physical_frontier.borrow() }
fn get_physical_compaction(&mut self) -> AntichainRef<'_, B::Time> { self.physical_frontier.borrow() }

#[inline]
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub trait TraceReader : LayoutExt {
/// not beyond this frontier will present as a time that compares identically with all query times beyond
/// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
/// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;

/// Advances the frontier that constrains physical compaction.
///
Expand All @@ -149,15 +149,15 @@ pub trait TraceReader : LayoutExt {
///
/// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
/// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Self::Time>);

/// Reports the physical compaction frontier.
///
/// All batches containing updates beyond this frontier will not be merged with other batches. This allows
/// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
/// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
/// other operators who need to take notice of the physical structure of update batches.
fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time>;

/// Maps logic across the non-empty sequence of batches in the trace.
///
Expand Down Expand Up @@ -314,7 +314,7 @@ pub trait Batcher {
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
}

/// Functionality for building batches from ordered update sequences.
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/trace/wrappers/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,29 @@ where
})
}

fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert(time.clone().to_outer());
}
self.trace.set_logical_compaction(self.stash1.borrow());
}
fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_logical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
}
self.stash2.borrow()
}

fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert(time.clone().to_outer());
}
self.trace.set_physical_compaction(self.stash1.borrow());
}
fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_physical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/trace/wrappers/enter_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,29 @@ where
})
}

fn set_logical_compaction(&mut self, frontier: AntichainRef<TInner>) {
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert((self.prior)(time));
}
self.trace.set_logical_compaction(self.stash1.borrow());
}
fn get_logical_compaction(&mut self) -> AntichainRef<TInner> {
fn get_logical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_logical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
}
self.stash2.borrow()
}

fn set_physical_compaction(&mut self, frontier: AntichainRef<TInner>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, TInner>) {
self.stash1.clear();
for time in frontier.iter() {
self.stash1.insert((self.prior)(time));
}
self.trace.set_physical_compaction(self.stash1.borrow());
}
fn get_physical_compaction(&mut self) -> AntichainRef<TInner> {
fn get_physical_compaction(&mut self) -> AntichainRef<'_, TInner> {
self.stash2.clear();
for time in self.trace.get_physical_compaction().iter() {
self.stash2.insert(TInner::to_inner(time.clone()));
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/trace/wrappers/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ where
.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), logic.clone())))
}

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }

fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }

fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.cursor_through(upper).map(|(x,y)| (CursorFilter::new(x, self.logic.clone()), y))
}
}
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/trace/wrappers/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ where
})
}

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }

fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }

fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
let func = &self.func;
self.trace.cursor_through(upper)
.map(|(cursor, storage)| (CursorFreeze::new(cursor, func.clone()), storage))
Expand Down
12 changes: 6 additions & 6 deletions differential-dataflow/src/trace/wrappers/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {
self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
}

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_logical_compaction() }
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_logical_compaction(frontier) }
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_logical_compaction() }

fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) { self.trace.set_physical_compaction(frontier) }
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.trace.get_physical_compaction() }

fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
fn cursor_through(&mut self, upper: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
let since = self.since.borrow();
let until = self.until.borrow();
self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
Expand All @@ -68,7 +68,7 @@ impl<Tr: TraceReader> TraceReader for TraceFrontier<Tr> {

impl<Tr: TraceReader> TraceFrontier<Tr> {
/// Makes a new trace wrapper
pub fn make_from(trace: Tr, since: AntichainRef<Tr::Time>, until: AntichainRef<Tr::Time>) -> Self {
pub fn make_from(trace: Tr, since: AntichainRef<'_, Tr::Time>, until: AntichainRef<'_, Tr::Time>) -> Self {
TraceFrontier {
trace,
since: since.to_owned(),
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/trace/wrappers/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
/// This change may not have immediately observable effects. It informs the shared trace that this
/// handle no longer requires access to times other than those in the future of `frontier`, but if
/// there are other handles to the same trace, it may not yet be able to compact.
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
self.logical_compaction = frontier.to_owned();
}
fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() }
/// Allows the trace to compact batches of times before `frontier`.
fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
self.physical_compaction = frontier.to_owned();
}
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() }
/// Creates a new cursor over the wrapped trace.
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl TraceHandler {
boxed.downcast_mut::<T>().ok_or(format!("failed to downcast: {}", name))
}
/// Enumerates the keys maintained in storage (for the `list` operation).
pub fn keys(&self) -> ::std::collections::hash_map::Keys<String, Box<dyn Any>> {
pub fn keys(&self) -> ::std::collections::hash_map::Keys<'_, String, Box<dyn Any>> {
self.handles.keys()
}
/// Assign a thing to key `name`, boxed as `Box<Any>`.
Expand Down
Loading