Skip to content

Commit 4217a1c

Browse files
authored
Variable supports container streams (#564)
* Variable for containers Adds support for `Variable` for suitable container streams. By default, support vectors, but leave the possibility for others to implement the new `Negate` and `ResultsIn` extension traits, which are the required interface for `Variable`. Signed-off-by: Moritz Hoffmann <[email protected]> * SemigroupVariable supports containers Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent fc0b1da commit 4217a1c

File tree

4 files changed

+178
-58
lines changed

4 files changed

+178
-58
lines changed

src/collection.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,35 @@ impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
172172
pub fn scope(&self) -> G {
173173
self.inner.scope()
174174
}
175+
176+
/// Creates a new collection whose counts are the negation of those in the input.
177+
///
178+
/// This method is most commonly used with `concat` to get those element in one collection but not another.
179+
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
180+
/// including negative counts.
181+
///
182+
/// # Examples
183+
///
184+
/// ```
185+
/// use differential_dataflow::input::Input;
186+
///
187+
/// ::timely::example(|scope| {
188+
///
189+
/// let data = scope.new_collection_from(1 .. 10).1;
190+
///
191+
/// let odds = data.filter(|x| x % 2 == 1);
192+
/// let evens = data.filter(|x| x % 2 == 0);
193+
///
194+
/// odds.negate()
195+
/// .concat(&data)
196+
/// .assert_eq(&evens);
197+
/// });
198+
/// ```
199+
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
200+
// an inherent method on `Collection`.
201+
pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<G, C> {
202+
crate::operators::Negate::negate(&self.inner).as_collection()
203+
}
175204
}
176205

177206
impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
@@ -552,36 +581,6 @@ impl<'a, G: Scope, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, G
552581

553582
/// Methods requiring an Abelian difference, to support negation.
554583
impl<G: Scope, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
555-
/// Creates a new collection whose counts are the negation of those in the input.
556-
///
557-
/// This method is most commonly used with `concat` to get those element in one collection but not another.
558-
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
559-
/// including negative counts.
560-
///
561-
/// # Examples
562-
///
563-
/// ```
564-
/// use differential_dataflow::input::Input;
565-
///
566-
/// ::timely::example(|scope| {
567-
///
568-
/// let data = scope.new_collection_from(1 .. 10).1;
569-
///
570-
/// let odds = data.filter(|x| x % 2 == 1);
571-
/// let evens = data.filter(|x| x % 2 == 0);
572-
///
573-
/// odds.negate()
574-
/// .concat(&data)
575-
/// .assert_eq(&evens);
576-
/// });
577-
/// ```
578-
pub fn negate(&self) -> Collection<G, D, R> {
579-
self.inner
580-
.map_in_place(|x| x.2.negate())
581-
.as_collection()
582-
}
583-
584-
585584
/// Assert if the collections are ever different.
586585
///
587586
/// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation

src/operators/iterate.rs

Lines changed: 92 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@
3333
use std::fmt::Debug;
3434
use std::ops::Deref;
3535

36+
use timely::Container;
3637
use timely::progress::{Timestamp, PathSummary};
3738
use timely::order::Product;
3839

3940
use timely::dataflow::*;
4041
use timely::dataflow::scopes::child::Iterative;
41-
use timely::dataflow::operators::{Feedback, ConnectLoop, Map};
42+
use timely::dataflow::operators::{Feedback, ConnectLoop};
4243
use timely::dataflow::operators::feedback::Handle;
4344

44-
use crate::{Data, Collection};
45+
use crate::{Data, Collection, AsCollection};
4546
use crate::difference::{Semigroup, Abelian};
4647
use crate::lattice::Lattice;
4748

@@ -151,37 +152,47 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
151152
/// });
152153
/// })
153154
/// ```
154-
pub struct Variable<G: Scope, D: Data, R: Abelian+'static>
155-
where G::Timestamp: Lattice {
156-
collection: Collection<G, D, R>,
157-
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
158-
source: Option<Collection<G, D, R>>,
155+
pub struct Variable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
156+
where
157+
G: Scope,
158+
G::Timestamp: Lattice,
159+
D: Data,
160+
R: Abelian + 'static,
161+
C: Container + Clone + 'static,
162+
{
163+
collection: Collection<G, D, R, C>,
164+
feedback: Handle<G, C>,
165+
source: Option<Collection<G, D, R, C>>,
159166
step: <G::Timestamp as Timestamp>::Summary,
160167
}
161168

162-
impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattice {
169+
impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
170+
where
171+
G::Timestamp: Lattice,
172+
StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
173+
{
163174
/// Creates a new initially empty `Variable`.
164175
///
165176
/// This method produces a simpler dataflow graph than `new_from`, and should
166177
/// be used whenever the variable has an empty input.
167178
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
168179
let (feedback, updates) = scope.feedback(step.clone());
169-
let collection = Collection::<G,D,R>::new(updates);
170-
Variable { collection, feedback, source: None, step }
180+
let collection = Collection::<G, D, R, C>::new(updates);
181+
Self { collection, feedback, source: None, step }
171182
}
172183

173184
/// Creates a new `Variable` from a supplied `source` stream.
174-
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
185+
pub fn new_from(source: Collection<G, D, R, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
175186
let (feedback, updates) = source.inner.scope().feedback(step.clone());
176-
let collection = Collection::<G,D,R>::new(updates).concat(&source);
187+
let collection = Collection::<G, D, R, C>::new(updates).concat(&source);
177188
Variable { collection, feedback, source: Some(source), step }
178189
}
179190

180191
/// Set the definition of the `Variable` to a collection.
181192
///
182193
/// This method binds the `Variable` to be equal to the supplied collection,
183194
/// which may be recursively defined in terms of the variable itself.
184-
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
195+
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
185196
let mut in_result = result.clone();
186197
if let Some(source) = &self.source {
187198
in_result = in_result.concat(&source.negate());
@@ -198,19 +209,19 @@ impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattic
198209
///
199210
/// This behavior can also be achieved by using `new` to create an empty initial
200211
/// collection, and then using `self.set(self.concat(result))`.
201-
pub fn set_concat(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
212+
pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
202213
let step = self.step;
203214
result
204215
.inner
205-
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
216+
.results_in(step)
206217
.connect_loop(self.feedback);
207218

208219
self.collection
209220
}
210221
}
211222

212-
impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timestamp: Lattice {
213-
type Target = Collection<G, D, R>;
223+
impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable<G, D, R, C> where G::Timestamp: Lattice {
224+
type Target = Collection<G, D, R, C>;
214225
fn deref(&self) -> &Self::Target {
215226
&self.collection
216227
}
@@ -222,36 +233,90 @@ impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timesta
222233
/// that do not implement `Abelian` and only implement `Semigroup`. This means
223234
/// that it can be used in settings where the difference type does not support
224235
/// negation.
225-
pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup+'static>
226-
where G::Timestamp: Lattice {
227-
collection: Collection<G, D, R>,
228-
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
236+
pub struct SemigroupVariable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
237+
where
238+
G::Timestamp: Lattice,
239+
G: Scope,
240+
D: Data,
241+
R: Semigroup + 'static,
242+
C: Container + Clone + 'static,
243+
{
244+
collection: Collection<G, D, R, C>,
245+
feedback: Handle<G, C>,
229246
step: <G::Timestamp as Timestamp>::Summary,
230247
}
231248

232-
impl<G: Scope, D: Data, R: Semigroup> SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
249+
impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone> SemigroupVariable<G, D, R, C>
250+
where
251+
G::Timestamp: Lattice,
252+
StreamCore<G, C>: ResultsIn<G, C>,
253+
{
233254
/// Creates a new initially empty `SemigroupVariable`.
234255
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
235256
let (feedback, updates) = scope.feedback(step.clone());
236-
let collection = Collection::<G,D,R>::new(updates);
257+
let collection = Collection::<G,D,R,C>::new(updates);
237258
SemigroupVariable { collection, feedback, step }
238259
}
239260

240261
/// Adds a new source of data to `self`.
241-
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
262+
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
242263
let step = self.step;
243264
result
244265
.inner
245-
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
266+
.results_in(step)
246267
.connect_loop(self.feedback);
247268

248269
self.collection
249270
}
250271
}
251272

252-
impl<G: Scope, D: Data, R: Semigroup> Deref for SemigroupVariable<G, D, R> where G::Timestamp: Lattice {
253-
type Target = Collection<G, D, R>;
273+
impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone+'static> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
274+
type Target = Collection<G, D, R, C>;
254275
fn deref(&self) -> &Self::Target {
255276
&self.collection
256277
}
257278
}
279+
280+
/// Extension trait for streams.
281+
pub trait ResultsIn<G: Scope, C> {
282+
/// Advances a timestamp in the stream according to the timestamp actions on the path.
283+
///
284+
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
285+
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
286+
///
287+
/// # Examples
288+
/// ```
289+
/// use timely::dataflow::Scope;
290+
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
291+
///
292+
/// use differential_dataflow::input::Input;
293+
/// use differential_dataflow::operators::ResultsIn;
294+
///
295+
/// timely::example(|scope| {
296+
/// let summary1 = 5;
297+
///
298+
/// let data = scope.new_collection_from(1 .. 10).1;
299+
/// /// Applies `results_in` on every timestamp in the collection.
300+
/// data.results_in(summary1);
301+
/// });
302+
/// ```
303+
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self;
304+
}
305+
306+
impl<G, D, R, C> ResultsIn<G, C> for Collection<G, D, R, C>
307+
where
308+
G: Scope,
309+
C: Clone,
310+
StreamCore<G, C>: ResultsIn<G, C>,
311+
{
312+
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
313+
self.inner.results_in(step).as_collection()
314+
}
315+
}
316+
317+
impl<G: Scope, D: timely::Data, R: timely::Data> ResultsIn<G, Vec<(D, G::Timestamp, R)>> for Stream<G, (D, G::Timestamp, R)> {
318+
fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
319+
use timely::dataflow::operators::Map;
320+
self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
321+
}
322+
}

src/operators/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
//! operators have specialized implementations to make them work efficiently, and are in addition
55
//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
66
7+
pub use self::negate::Negate;
78
pub use self::reduce::{Reduce, Threshold, Count};
8-
pub use self::iterate::Iterate;
9+
pub use self::iterate::{Iterate, ResultsIn};
910
pub use self::join::{Join, JoinCore};
1011
pub use self::count::CountTotal;
1112
pub use self::threshold::ThresholdTotal;
1213

1314
pub mod arrange;
15+
pub mod negate;
1416
pub mod reduce;
1517
pub mod consolidate;
1618
pub mod iterate;

src/operators/negate.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
//! Negate the diffs of collections and streams.
2+
3+
use timely::Data;
4+
use timely::dataflow::{Scope, Stream, StreamCore};
5+
use timely::dataflow::operators::Map;
6+
7+
use crate::{AsCollection, Collection};
8+
use crate::difference::Abelian;
9+
10+
/// Negate the contents of a stream.
11+
pub trait Negate<G, C> {
12+
/// Creates a new collection whose counts are the negation of those in the input.
13+
///
14+
/// This method is most commonly used with `concat` to get those element in one collection but not another.
15+
/// However, differential dataflow computations are still defined for all values of the difference type `R`,
16+
/// including negative counts.
17+
///
18+
/// # Examples
19+
///
20+
/// ```
21+
/// use differential_dataflow::input::Input;
22+
///
23+
/// ::timely::example(|scope| {
24+
///
25+
/// let data = scope.new_collection_from(1 .. 10).1;
26+
///
27+
/// let odds = data.filter(|x| x % 2 == 1);
28+
/// let evens = data.filter(|x| x % 2 == 0);
29+
///
30+
/// odds.negate()
31+
/// .concat(&data)
32+
/// .assert_eq(&evens);
33+
/// });
34+
/// ```
35+
fn negate(&self) -> Self;
36+
}
37+
38+
impl<G, D, R, C> Negate<G, C> for Collection<G, D, R, C>
39+
where
40+
G: Scope,
41+
C: Clone,
42+
StreamCore<G, C>: Negate<G, C>,
43+
{
44+
fn negate(&self) -> Self {
45+
self.inner.negate().as_collection()
46+
}
47+
}
48+
49+
impl<G: Scope, D: Data, T: Data, R: Data + Abelian> Negate<G, Vec<(D, T, R)>> for Stream<G, (D, T, R)> {
50+
fn negate(&self) -> Self {
51+
self.map_in_place(|x| x.2.negate())
52+
}
53+
}
54+

0 commit comments

Comments
 (0)