Skip to content

Commit f76769a

Browse files
committed
Introduce traits for collection containers
1 parent f2a3df6 commit f76769a

File tree

4 files changed

+148
-146
lines changed

4 files changed

+148
-146
lines changed

differential-dataflow/src/collection.rs

Lines changed: 141 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,80 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
204204
/// .assert_eq(&evens);
205205
/// });
206206
/// ```
207-
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
208-
// an inherent method on `Collection`.
209-
pub fn negate(&self) -> Collection<G, D, R, C>
210-
where
211-
StreamCore<G, C>: crate::operators::Negate<G, C>
207+
pub fn negate(&self) -> Collection<G, D, R, C> where C: traits::Negate {
208+
use timely::dataflow::channels::pact::Pipeline;
209+
self.inner
210+
.unary(Pipeline, "Negate", move |_,_| move |input, output| {
211+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
212+
})
213+
.as_collection()
214+
}
215+
216+
/// Brings a Collection into a nested scope.
217+
///
218+
/// # Examples
219+
///
220+
/// ```
221+
/// use timely::dataflow::Scope;
222+
/// use differential_dataflow::input::Input;
223+
///
224+
/// ::timely::example(|scope| {
225+
///
226+
/// let data = scope.new_collection_from(1 .. 10).1;
227+
///
228+
/// let result = scope.region(|child| {
229+
/// data.enter(child)
230+
/// .leave()
231+
/// });
232+
///
233+
/// data.assert_eq(&result);
234+
/// });
235+
/// ```
236+
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R, <C as traits::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
237+
where
238+
C: traits::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
239+
T: Refines<<G as ScopeParent>::Timestamp>,
212240
{
213-
crate::operators::Negate::negate(&self.inner).as_collection()
241+
use timely::dataflow::channels::pact::Pipeline;
242+
self.inner
243+
.enter(child)
244+
.unary(Pipeline, "Enter", move |_,_| move |input, output| {
245+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
246+
})
247+
.as_collection()
248+
}
249+
250+
/// Advances a timestamp in the stream according to the timestamp actions on the path.
251+
///
252+
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
253+
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
254+
///
255+
/// # Examples
256+
/// ```
257+
/// use timely::dataflow::Scope;
258+
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
259+
///
260+
/// use differential_dataflow::input::Input;
261+
/// use differential_dataflow::operators::ResultsIn;
262+
///
263+
/// timely::example(|scope| {
264+
/// let summary1 = 5;
265+
///
266+
/// let data = scope.new_collection_from(1 .. 10).1;
267+
/// /// Applies `results_in` on every timestamp in the collection.
268+
/// data.results_in(summary1);
269+
/// });
270+
/// ```
271+
pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
272+
where
273+
C: traits::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
274+
{
275+
use timely::dataflow::channels::pact::Pipeline;
276+
self.inner
277+
.unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
278+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
279+
})
280+
.as_collection()
214281
}
215282
}
216283

@@ -379,36 +446,6 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
379446
.as_collection()
380447
}
381448

382-
/// Brings a Collection into a nested scope.
383-
///
384-
/// # Examples
385-
///
386-
/// ```
387-
/// use timely::dataflow::Scope;
388-
/// use differential_dataflow::input::Input;
389-
///
390-
/// ::timely::example(|scope| {
391-
///
392-
/// let data = scope.new_collection_from(1 .. 10).1;
393-
///
394-
/// let result = scope.region(|child| {
395-
/// data.enter(child)
396-
/// .leave()
397-
/// });
398-
///
399-
/// data.assert_eq(&result);
400-
/// });
401-
/// ```
402-
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
403-
where
404-
T: Refines<<G as ScopeParent>::Timestamp>,
405-
{
406-
self.inner
407-
.enter(child)
408-
.map(|(data, time, diff)| (data, T::to_inner(time), diff))
409-
.as_collection()
410-
}
411-
412449
/// Brings a Collection into a nested scope, at varying times.
413450
///
414451
/// The `initial` function indicates the time at which each element of the Collection should appear.
@@ -558,8 +595,9 @@ use timely::dataflow::scopes::ScopeParent;
558595
use timely::progress::timestamp::Refines;
559596

560597
/// Methods requiring a nested scope.
561-
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
598+
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection<Child<'a, G, T>, D, R, C>
562599
where
600+
C: traits::Leave<T, G::Timestamp, OuterContainer: Container>,
563601
T: Refines<<G as ScopeParent>::Timestamp>,
564602
{
565603
/// Returns the final value of a Collection from a nested scope to its containing scope.
@@ -582,10 +620,13 @@ where
582620
/// data.assert_eq(&result);
583621
/// });
584622
/// ```
585-
pub fn leave(&self) -> Collection<G, D, R> {
623+
pub fn leave(&self) -> Collection<G, D, R, <C as traits::Leave<T, G::Timestamp>>::OuterContainer> {
624+
use timely::dataflow::channels::pact::Pipeline;
586625
self.inner
587626
.leave()
588-
.map(|(data, time, diff)| (data, time.to_outer(), diff))
627+
.unary(Pipeline, "Leave", move |_,_| move |input, output| {
628+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
629+
})
589630
.as_collection()
590631
}
591632
}
@@ -690,3 +731,64 @@ where
690731
.concatenate(iterator.into_iter().map(|x| x.inner))
691732
.as_collection()
692733
}
734+
735+
/// Traits that can be implemented by containers to provide functionality to collections based on them.
736+
pub mod traits {
737+
738+
use timely::progress::{Timestamp, timestamp::Refines};
739+
use crate::collection::Abelian;
740+
741+
/// A container that can negate its updates.
742+
pub trait Negate {
743+
/// Negates Abelian differences of each update.
744+
fn negate(self) -> Self;
745+
}
746+
impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
747+
fn negate(mut self) -> Self {
748+
for (_data, _time, diff) in self.iter_mut() {
749+
diff.negate();
750+
}
751+
self
752+
}
753+
}
754+
755+
/// A container that can enter from timestamp `T1` into timestamp `T2`.
756+
pub trait Enter<T1, T2> {
757+
/// The resulting container type.
758+
type InnerContainer;
759+
/// Update timestamps from `T1` to `T2`.
760+
fn enter(self) -> Self::InnerContainer;
761+
}
762+
impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
763+
type InnerContainer = Vec<(D, T2, R)>;
764+
fn enter(self) -> Self::InnerContainer {
765+
self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
766+
}
767+
}
768+
769+
/// A container that can leave from timestamp `T1` into timestamp `T2`.
770+
pub trait Leave<T1, T2> {
771+
/// The resulting container type.
772+
type OuterContainer;
773+
/// Update timestamps from `T1` to `T2`.
774+
fn leave(self) -> Self::OuterContainer;
775+
}
776+
impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
777+
type OuterContainer = Vec<(D, T2, R)>;
778+
fn leave(self) -> Self::OuterContainer {
779+
self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
780+
}
781+
}
782+
783+
/// A container that can advance timestamps by a summary `TS`.
784+
pub trait ResultsIn<TS> {
785+
/// Advance times in the container by `step`.
786+
fn results_in(self, step: &TS) -> Self;
787+
}
788+
impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
789+
fn results_in(self, step: &T::Summary) -> Self {
790+
use timely::progress::PathSummary;
791+
self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
792+
}
793+
}
794+
}

differential-dataflow/src/operators/iterate.rs

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ use std::fmt::Debug;
3434
use std::ops::Deref;
3535

3636
use timely::Container;
37-
use timely::progress::{Timestamp, PathSummary};
37+
use timely::progress::Timestamp;
3838
use timely::order::Product;
3939

4040
use timely::dataflow::*;
4141
use timely::dataflow::scopes::child::Iterative;
4242
use timely::dataflow::operators::{Feedback, ConnectLoop};
4343
use timely::dataflow::operators::feedback::Handle;
4444

45-
use crate::{Data, Collection, AsCollection};
45+
use crate::{Data, Collection};
4646
use crate::difference::{Semigroup, Abelian};
4747
use crate::lattice::Lattice;
4848

@@ -167,7 +167,7 @@ where
167167
impl<G, D: Data, R: Abelian, C: Container> Variable<G, D, R, C>
168168
where
169169
G: Scope<Timestamp: Lattice>,
170-
StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
170+
C: crate::collection::traits::Negate + crate::collection::traits::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
171171
{
172172
/// Creates a new initially empty `Variable`.
173173
///
@@ -210,8 +210,8 @@ where
210210
pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
211211
let step = self.step;
212212
result
213-
.inner
214213
.results_in(step)
214+
.inner
215215
.connect_loop(self.feedback);
216216

217217
self.collection
@@ -246,7 +246,7 @@ where
246246
impl<G, D: Data, R: Semigroup, C: Container> SemigroupVariable<G, D, R, C>
247247
where
248248
G: Scope<Timestamp: Lattice>,
249-
StreamCore<G, C>: ResultsIn<G, C>,
249+
C: crate::collection::traits::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
250250
{
251251
/// Creates a new initially empty `SemigroupVariable`.
252252
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
@@ -259,8 +259,8 @@ where
259259
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
260260
let step = self.step;
261261
result
262-
.inner
263262
.results_in(step)
263+
.inner
264264
.connect_loop(self.feedback);
265265

266266
self.collection
@@ -273,47 +273,3 @@ impl<G: Scope, D: Data, R: Semigroup, C: Container> Deref for SemigroupVariable<
273273
&self.collection
274274
}
275275
}
276-
277-
/// Extension trait for streams.
278-
pub trait ResultsIn<G: Scope, C> {
279-
/// Advances a timestamp in the stream according to the timestamp actions on the path.
280-
///
281-
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
282-
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
283-
///
284-
/// # Examples
285-
/// ```
286-
/// use timely::dataflow::Scope;
287-
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
288-
///
289-
/// use differential_dataflow::input::Input;
290-
/// use differential_dataflow::operators::ResultsIn;
291-
///
292-
/// timely::example(|scope| {
293-
/// let summary1 = 5;
294-
///
295-
/// let data = scope.new_collection_from(1 .. 10).1;
296-
/// /// Applies `results_in` on every timestamp in the collection.
297-
/// data.results_in(summary1);
298-
/// });
299-
/// ```
300-
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self;
301-
}
302-
303-
impl<G, D, R, C> ResultsIn<G, C> for Collection<G, D, R, C>
304-
where
305-
G: Scope,
306-
C: Clone,
307-
StreamCore<G, C>: ResultsIn<G, C>,
308-
{
309-
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
310-
self.inner.results_in(step).as_collection()
311-
}
312-
}
313-
314-
impl<G: Scope, D: timely::Data, R: timely::Data> ResultsIn<G, Vec<(D, G::Timestamp, R)>> for Stream<G, (D, G::Timestamp, R)> {
315-
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
316-
use timely::dataflow::operators::Map;
317-
self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
318-
}
319-
}

differential-dataflow/src/operators/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@
44
//! operators have specialized implementations to make them work efficiently, and are in addition
55
//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
66
7-
pub use self::negate::Negate;
87
pub use self::reduce::{Reduce, Threshold, Count};
9-
pub use self::iterate::{Iterate, ResultsIn};
8+
pub use self::iterate::Iterate;
109
pub use self::join::{Join, JoinCore};
1110
pub use self::count::CountTotal;
1211
pub use self::threshold::ThresholdTotal;
1312

1413
pub mod arrange;
15-
pub mod negate;
1614
pub mod reduce;
1715
pub mod consolidate;
1816
pub mod iterate;

differential-dataflow/src/operators/negate.rs

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)