diff --git a/container/Cargo.toml b/container/Cargo.toml index 0e862dad9..41c46c2ef 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -4,8 +4,3 @@ version = "0.14.0" description = "Container abstractions for Timely" license = "MIT" edition.workspace = true - -[dependencies] -columnation = "0.1" -flatcontainer = "0.5" -serde = { version = "1.0", features = ["derive"] } diff --git a/container/src/columnation.rs b/container/src/columnation.rs deleted file mode 100644 index 459eeeb81..000000000 --- a/container/src/columnation.rs +++ /dev/null @@ -1,394 +0,0 @@ -//! A columnar container based on the columnation library. - -use std::iter::FromIterator; - -pub use columnation::*; - -use crate::PushInto; - -/// An append-only vector that store records as columns. -/// -/// This container maintains elements that might conventionally own -/// memory allocations, but instead the pointers to those allocations -/// reference larger regions of memory shared with multiple instances -/// of the type. Elements can be retrieved as references, and care is -/// taken when this type is dropped to ensure that the correct memory -/// is returned (rather than the incorrect memory, from running the -/// elements `Drop` implementations). -pub struct TimelyStack { - local: Vec, - inner: T::InnerRegion, -} - -impl TimelyStack { - /// Construct a [TimelyStack], reserving space for `capacity` elements - /// - /// Note that the associated region is not initialized to a specific capacity - /// because we can't generally know how much space would be required. - pub fn with_capacity(capacity: usize) -> Self { - Self { - local: Vec::with_capacity(capacity), - inner: T::InnerRegion::default(), - } - } - - /// Ensures `Self` can absorb `items` without further allocations. - /// - /// The argument `items` may be cloned and iterated multiple times. - /// Please be careful if it contains side effects. - #[inline(always)] - pub fn reserve_items<'a, I>(&mut self, items: I) - where - I: Iterator+Clone, - T: 'a, - { - self.local.reserve(items.clone().count()); - self.inner.reserve_items(items); - } - - /// Ensures `Self` can absorb `items` without further allocations. - /// - /// The argument `items` may be cloned and iterated multiple times. - /// Please be careful if it contains side effects. - #[inline(always)] - pub fn reserve_regions<'a, I>(&mut self, regions: I) - where - Self: 'a, - I: Iterator+Clone, - { - self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum()); - self.inner.reserve_regions(regions.map(|cs| &cs.inner)); - } - - - - /// Copies an element in to the region. - /// - /// The element can be read by indexing - pub fn copy(&mut self, item: &T) { - // TODO: Some types `T` should just be cloned. - // E.g. types that are `Copy` or vecs of ZSTs. - unsafe { - self.local.push(self.inner.copy(item)); - } - } - /// Empties the collection. - pub fn clear(&mut self) { - unsafe { - // Unsafety justified in that setting the length to zero exposes - // no invalid data. - self.local.set_len(0); - self.inner.clear(); - } - } - /// Retain elements that pass a predicate, from a specified offset. - /// - /// This method may or may not reclaim memory in the inner region. - pub fn retain_from bool>(&mut self, index: usize, mut predicate: P) { - let mut write_position = index; - for position in index..self.local.len() { - if predicate(&self[position]) { - // TODO: compact the inner region and update pointers. - self.local.swap(position, write_position); - write_position += 1; - } - } - unsafe { - // Unsafety justified in that `write_position` is no greater than - // `self.local.len()` and so this exposes no invalid data. - self.local.set_len(write_position); - } - } - - /// Unsafe access to `local` data. The slices stores data that is backed by a region - /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice. - /// - /// # Safety - /// Elements within `local` can be reordered, but not mutated, removed and/or dropped. - pub unsafe fn local(&mut self) -> &mut [T] { - &mut self.local[..] - } - - /// Estimate the memory capacity in bytes. - #[inline] - pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { - let size_of = std::mem::size_of::(); - callback(self.local.len() * size_of, self.local.capacity() * size_of); - self.inner.heap_size(callback); - } - - /// Estimate the consumed memory capacity in bytes, summing both used and total capacity. - #[inline] - pub fn summed_heap_size(&self) -> (usize, usize) { - let (mut length, mut capacity) = (0, 0); - self.heap_size(|len, cap| { - length += len; - capacity += cap - }); - (length, capacity) - } - - /// The length in items. - #[inline] - pub fn len(&self) -> usize { - self.local.len() - } - - /// The capacity of the local vector. - #[inline] - pub fn capacity(&self) -> usize { - self.local.capacity() - } - - /// Reserve space for `additional` elements. - #[inline] - pub fn reserve(&mut self, additional: usize) { - self.local.reserve(additional) - } -} - -impl TimelyStack<(A, B)> { - /// Copies a destructured tuple `(A, B)` into this column stack. - /// - /// This serves situations where a tuple should be constructed from its constituents but not - /// not all elements are available as owned data. - /// - /// The element can be read by indexing - pub fn copy_destructured(&mut self, t1: &A, t2: &B) { - unsafe { - self.local.push(self.inner.copy_destructured(t1, t2)); - } - } -} - -impl TimelyStack<(A, B, C)> { - /// Copies a destructured tuple `(A, B, C)` into this column stack. - /// - /// This serves situations where a tuple should be constructed from its constituents but not - /// not all elements are available as owned data. - /// - /// The element can be read by indexing - pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) { - unsafe { - self.local.push(self.inner.copy_destructured(r0, r1, r2)); - } - } -} - -impl std::ops::Deref for TimelyStack { - type Target = [T]; - #[inline(always)] - fn deref(&self) -> &Self::Target { - &self.local[..] - } -} - -impl Drop for TimelyStack { - fn drop(&mut self) { - self.clear(); - } -} - -impl Default for TimelyStack { - fn default() -> Self { - Self { - local: Vec::new(), - inner: T::InnerRegion::default(), - } - } -} - -impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack { - fn from_iter>(iter: T) -> Self { - let iter = iter.into_iter(); - let mut c = TimelyStack::::with_capacity(iter.size_hint().0); - for element in iter { - c.copy(element); - } - - c - } -} - -impl PartialEq for TimelyStack { - fn eq(&self, other: &Self) -> bool { - PartialEq::eq(&self[..], &other[..]) - } -} - -impl Eq for TimelyStack {} - -impl std::fmt::Debug for TimelyStack { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self[..].fmt(f) - } -} - -impl Clone for TimelyStack { - fn clone(&self) -> Self { - let mut new: Self = Default::default(); - for item in &self[..] { - new.copy(item); - } - new - } - - fn clone_from(&mut self, source: &Self) { - self.clear(); - for item in &source[..] { - self.copy(item); - } - } -} - -impl PushInto for TimelyStack { - #[inline] - fn push_into(&mut self, item: T) { - self.copy(&item); - } -} - -impl PushInto<&T> for TimelyStack { - #[inline] - fn push_into(&mut self, item: &T) { - self.copy(item); - } -} - - -impl PushInto<&&T> for TimelyStack { - #[inline] - fn push_into(&mut self, item: &&T) { - self.copy(*item); - } -} - -mod container { - use std::ops::Deref; - use crate::{Container, SizableContainer}; - - use crate::columnation::{Columnation, TimelyStack}; - - impl Container for TimelyStack { - type ItemRef<'a> = &'a T where Self: 'a; - type Item<'a> = &'a T where Self: 'a; - - fn len(&self) -> usize { - self.local.len() - } - - fn is_empty(&self) -> bool { - self.local.is_empty() - } - - fn clear(&mut self) { - TimelyStack::clear(self) - } - - type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; - - fn iter(&self) -> Self::Iter<'_> { - self.deref().iter() - } - - type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a; - - fn drain(&mut self) -> Self::DrainIter<'_> { - (*self).iter() - } - } - - impl SizableContainer for TimelyStack { - fn at_capacity(&self) -> bool { - self.len() == self.capacity() - } - fn ensure_capacity(&mut self, stash: &mut Option) { - if self.capacity() == 0 { - *self = stash.take().unwrap_or_default(); - self.clear(); - } - let preferred = crate::buffer::default_capacity::(); - if self.capacity() < preferred { - self.reserve(preferred - self.capacity()); - } - } - } -} - -mod flatcontainer { - //! A bare-bones flatcontainer region implementation for [`TimelyStack`]. - - use columnation::Columnation; - use flatcontainer::{Push, Region, ReserveItems}; - use crate::columnation::TimelyStack; - - #[derive(Debug, Clone)] - struct ColumnationRegion { - inner: TimelyStack, - } - - impl Default for ColumnationRegion { - fn default() -> Self { - Self { inner: Default::default() } - } - } - - impl Region for ColumnationRegion { - type Owned = T; - type ReadItem<'a> = &'a T where Self: 'a; - type Index = usize; - - fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self where Self: 'a { - let mut inner = TimelyStack::default(); - inner.reserve_regions(regions.map(|r| &r.inner)); - Self { inner} - } - - fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { - &self.inner[index] - } - - fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator + Clone { - self.inner.reserve_regions(regions.map(|r| &r.inner)); - } - - fn clear(&mut self) { - self.inner.clear(); - } - - fn heap_size(&self, callback: F) { - self.inner.heap_size(callback); - } - - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> where Self: 'a { - item - } - } - - impl Push for ColumnationRegion { - fn push(&mut self, item: T) -> Self::Index { - self.inner.copy(&item); - self.inner.len() - 1 - } - } - - impl Push<&T> for ColumnationRegion { - fn push(&mut self, item: &T) -> Self::Index { - self.inner.copy(item); - self.inner.len() - 1 - } - } - - impl Push<&&T> for ColumnationRegion { - fn push(&mut self, item: &&T) -> Self::Index { - self.inner.copy(*item); - self.inner.len() - 1 - } - } - - impl<'a, T: Columnation + Clone + 'a> ReserveItems<&'a T> for ColumnationRegion { - fn reserve_items(&mut self, items: I) where I: Iterator + Clone { - self.inner.reserve_items(items); - } - } -} diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs deleted file mode 100644 index b5a41ab51..000000000 --- a/container/src/flatcontainer.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Present a [`FlatStack`] as a timely container. - -pub use flatcontainer::*; -use crate::{buffer, Container, SizableContainer, PushInto}; - -impl Container for FlatStack { - type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; - type Item<'a> = R::ReadItem<'a> where Self: 'a; - - fn len(&self) -> usize { - self.len() - } - - fn clear(&mut self) { - self.clear() - } - - type Iter<'a> = <&'a Self as IntoIterator>::IntoIter where Self: 'a; - - fn iter(&self) -> Self::Iter<'_> { - IntoIterator::into_iter(self) - } - - type DrainIter<'a> = Self::Iter<'a> where Self: 'a; - - fn drain(&mut self) -> Self::DrainIter<'_> { - IntoIterator::into_iter(&*self) - } -} - -impl SizableContainer for FlatStack { - fn at_capacity(&self) -> bool { - self.len() == self.capacity() - } - fn ensure_capacity(&mut self, stash: &mut Option) { - if self.capacity() == 0 { - *self = stash.take().unwrap_or_default(); - self.clear(); - } - let preferred = buffer::default_capacity::(); - if self.capacity() < preferred { - self.reserve(preferred - self.capacity()); - } - } -} - -impl, T> PushInto for FlatStack { - #[inline] - fn push_into(&mut self, item: T) { - self.copy(item); - } -} diff --git a/container/src/lib.rs b/container/src/lib.rs index 9394f1ec0..e9aec213a 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -4,9 +4,6 @@ use std::collections::VecDeque; -pub mod columnation; -pub mod flatcontainer; - /// A container transferring data through dataflow edges /// /// A container stores a number of elements and thus is able to describe it length (`len()`) and diff --git a/timely/Cargo.toml b/timely/Cargo.toml index bc672931c..64f066ecc 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -20,6 +20,7 @@ getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] columnar = { workspace = true } +columnation = "0.1" getopts-dep = { package = "getopts", version = "0.2.21", optional = true } bincode = { version = "1.0" } byteorder = "1.5" diff --git a/timely/examples/flatcontainer.rs b/timely/examples/flatcontainer.rs deleted file mode 100644 index e361e2d6f..000000000 --- a/timely/examples/flatcontainer.rs +++ /dev/null @@ -1,97 +0,0 @@ -//! Wordcount based on flatcontainer. - -use { - std::collections::HashMap, - timely::container::CapacityContainerBuilder, - timely::container::flatcontainer::{RegionPreference, FlatStack}, - timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, - timely::dataflow::InputHandleCore, - timely::dataflow::operators::{Inspect, Operator, Probe}, - timely::dataflow::ProbeHandle, -}; - -fn main() { - - type Container = FlatStack<<(String, i64) as RegionPreference>::Region>; - - // initializes and runs a timely dataflow. - timely::execute_from_args(std::env::args(), |worker| { - let mut input = >>::new(); - let mut probe = ProbeHandle::new(); - - // create a new input, exchange data, and inspect its output - worker.dataflow::(|scope| { - input - .to_stream(scope) - .unary( - Pipeline, - "Split", - |_cap, _info| { - move |input, output| { - while let Some((time, data)) = input.next() { - let mut session = output.session(&time); - for (text, diff) in data.iter().flat_map(|(text, diff)| { - text.split_whitespace().map(move |s| (s, diff)) - }) { - session.give((text, diff)); - } - } - } - }, - ) - .container::() - .unary_frontier( - ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64), - "WordCount", - |_capability, _info| { - let mut queues = HashMap::new(); - let mut counts = HashMap::new(); - - move |input, output| { - while let Some((time, data)) = input.next() { - queues - .entry(time.retain()) - .or_insert(Vec::new()) - .push(std::mem::take(data)); - } - - for (key, val) in queues.iter_mut() { - if !input.frontier().less_equal(key.time()) { - let mut session = output.session(key); - for batch in val.drain(..) { - for (word, diff) in batch.iter() { - let total = - if let Some(count) = counts.get_mut(word) { - *count += diff; - *count - } - else { - counts.insert(word.to_string(), diff); - diff - }; - session.give((word, total)); - } - } - } - } - - queues.retain(|_key, val| !val.is_empty()); - } - }, - ) - .container::() - .inspect(|x| println!("seen: {:?}", x)) - .probe_with(&mut probe); - }); - - // introduce data and watch! - for round in 0..10 { - input.send(("flat container", 1)); - input.advance_to(round + 1); - while probe.less_than(input.time()) { - worker.step(); - } - } - }) - .unwrap(); -} diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 5e1608b28..6a351753b 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -136,26 +136,6 @@ mod implementations { } } - use crate::container::flatcontainer::FlatStack; - impl Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack { - fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { - ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed") - } - - fn length_in_bytes(&self) -> usize { - let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize; - (length + 7) & !7 - } - - fn into_bytes(&self, writer: &mut W) { - let mut counter = WriteCounter::new(writer); - ::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed"); - let written = counter.count; - let written_slop = ((written + 7) & !7) - written; - counter.write_all(&[0u8; 8][..written_slop]).unwrap(); - } - } - use write_counter::WriteCounter; /// A `Write` wrapper that counts the bytes written. mod write_counter { diff --git a/timely/src/order.rs b/timely/src/order.rs index 064cb4b07..f31677117 100644 --- a/timely/src/order.rs +++ b/timely/src/order.rs @@ -57,14 +57,13 @@ implement_partial!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isiz implement_total!(u8, u16, u32, u64, u128, usize, i8, i16, i32, i64, i128, isize, (), ::std::time::Duration,); pub use product::Product; -pub use product::flatcontainer::ProductRegion as FlatProductRegion; /// A pair of timestamps, partially ordered by the product order. mod product { use std::fmt::{Formatter, Error, Debug}; use columnar::Columnar; use serde::{Deserialize, Serialize}; - use crate::container::columnation::{Columnation, Region}; + use columnation::{Columnation, Region}; use crate::order::{Empty, TotalOrder}; use crate::progress::Timestamp; use crate::progress::timestamp::PathSummary; @@ -192,139 +191,6 @@ mod product { self.inner_region.heap_size(callback); } } - - pub mod flatcontainer { - use timely_container::flatcontainer::{IntoOwned, Push, Region, RegionPreference, ReserveItems}; - use super::Product; - - impl RegionPreference for Product { - type Owned = Product; - type Region = ProductRegion; - } - - /// Region to store [`Product`] timestamps. - #[derive(Default, Clone, Debug)] - pub struct ProductRegion { - outer_region: RO, - inner_region: RI, - } - - impl Region for ProductRegion { - type Owned = Product; - type ReadItem<'a> = Product, RI::ReadItem<'a>> where Self: 'a; - type Index = (RO::Index, RI::Index); - - #[inline] - fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self where Self: 'a { - let outer_region = RO::merge_regions(regions.clone().map(|r| &r.outer_region)); - let inner_region = RI::merge_regions(regions.map(|r| &r.inner_region)); - Self { outer_region, inner_region } - } - - #[inline] - fn index(&self, (outer, inner): Self::Index) -> Self::ReadItem<'_> { - Product::new(self.outer_region.index(outer), self.inner_region.index(inner)) - } - - #[inline] - fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator + Clone { - self.outer_region.reserve_regions(regions.clone().map(|r| &r.outer_region)); - self.inner_region.reserve_regions(regions.map(|r| &r.inner_region)); - } - - #[inline] - fn clear(&mut self) { - self.outer_region.clear(); - self.inner_region.clear(); - } - - #[inline] - fn heap_size(&self, mut callback: F) { - self.outer_region.heap_size(&mut callback); - self.inner_region.heap_size(callback); - } - - #[inline] - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> where Self: 'a { - Product::new(RO::reborrow(item.outer), RI::reborrow(item.inner)) - } - } - - impl<'a, TOuter, TInner> IntoOwned<'a> for Product - where - TOuter: IntoOwned<'a>, - TInner: IntoOwned<'a>, - { - type Owned = Product; - - fn into_owned(self) -> Self::Owned { - Product::new(self.outer.into_owned(), self.inner.into_owned()) - } - - fn clone_onto(self, other: &mut Self::Owned) { - self.outer.clone_onto(&mut other.outer); - self.inner.clone_onto(&mut other.inner); - } - - fn borrow_as(owned: &'a Self::Owned) -> Self { - Product::new(IntoOwned::borrow_as(&owned.outer), IntoOwned::borrow_as(&owned.inner)) - } - } - - impl<'a, RO, RI> ReserveItems, RI::ReadItem<'a>>> for ProductRegion - where - RO: Region + ReserveItems<::ReadItem<'a>> + 'a, - RI: Region + ReserveItems<::ReadItem<'a>> + 'a, - { - #[inline] - fn reserve_items(&mut self, items: I) where I: Iterator, RI::ReadItem<'a>>> + Clone { - self.outer_region.reserve_items(items.clone().map(|i| i.outer)); - self.inner_region.reserve_items(items.clone().map(|i| i.inner)); - } - } - - impl Push> for ProductRegion - where - RO: Region + Push, - RI: Region + Push, - { - #[inline] - fn push(&mut self, item: Product) -> Self::Index { - ( - self.outer_region.push(item.outer), - self.inner_region.push(item.inner) - ) - } - } - - impl<'a, TO, TI, RO, RI> Push<&'a Product> for ProductRegion - where - RO: Region + Push<&'a TO>, - RI: Region + Push<&'a TI>, - { - #[inline] - fn push(&mut self, item: &'a Product) -> Self::Index { - ( - self.outer_region.push(&item.outer), - self.inner_region.push(&item.inner) - ) - } - } - - impl<'a, TO, TI, RO, RI> Push<&&'a Product> for ProductRegion - where - RO: Region + Push<&'a TO>, - RI: Region + Push<&'a TI>, - { - #[inline] - fn push(&mut self, item: && 'a Product) -> Self::Index { - ( - self.outer_region.push(&item.outer), - self.inner_region.push(&item.inner) - ) - } - } - } } /// Rust tuple ordered by the lexicographic order.