Skip to content

Commit b28a073

Browse files
committed
Extract CollectionCore from Collection
1 parent 1e242f6 commit b28a073

File tree

6 files changed

+58
-58
lines changed

6 files changed

+58
-58
lines changed

differential-dataflow/examples/graspan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use differential_dataflow::Collection;
1212
use differential_dataflow::lattice::Lattice;
1313
use differential_dataflow::input::{Input, InputSession};
1414
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
15-
use differential_dataflow::operators::iterate::Variable;
15+
use differential_dataflow::operators::iterate::VariableRow;
1616
use differential_dataflow::operators::Threshold;
1717

1818
type Node = usize;
@@ -83,7 +83,7 @@ type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Tim
8383
/// An edge variable provides arranged representations of its contents, even before they are
8484
/// completely defined, in support of recursively defined productions.
8585
pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
86-
variable: Variable<G, Edge, Diff>,
86+
variable: VariableRow<G, Edge, Diff>,
8787
current: Collection<G, Edge, Diff>,
8888
forward: Option<Arrange<G, Node, Node, Diff>>,
8989
reverse: Option<Arrange<G, Node, Node, Diff>>,
@@ -92,7 +92,7 @@ pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
9292
impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
9393
/// Creates a new variable initialized with `source`.
9494
pub fn from(source: &Collection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
95-
let variable = Variable::new(&mut source.scope(), step);
95+
let variable = VariableRow::new(&mut source.scope(), step);
9696
EdgeVariable {
9797
variable: variable,
9898
current: source.clone(),
@@ -153,7 +153,7 @@ impl Query {
153153

154154
/// Creates a dataflow implementing the query, and returns input and trace handles.
155155
pub fn render_in<G>(&self, scope: &mut G) -> IndexMap<String, RelationHandles<G::Timestamp>>
156-
where
156+
where
157157
G: Scope<Timestamp: Lattice+::timely::order::TotalOrder>,
158158
{
159159
// Create new input (handle, stream) pairs

differential-dataflow/examples/iterate_container.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use timely::dataflow::operators::Operator;
55
use timely::order::Product;
66
use timely::dataflow::{Scope, StreamCore};
77
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
8-
use differential_dataflow::{AsCollection, Collection};
8+
use differential_dataflow::{AsCollection, collection::CollectionCore};
99
use differential_dataflow::input::Input;
1010
use differential_dataflow::operators::iterate::Variable;
1111
use differential_dataflow::collection::containers::{Enter, Leave, Negate, ResultsIn};
@@ -52,12 +52,12 @@ fn main() {
5252
timely::example(|scope| {
5353

5454
let numbers = scope.new_collection_from(1 .. 10u32).1;
55-
let numbers: Collection<_, u32, isize, _> = wrap(&numbers.inner).as_collection();
55+
let numbers: CollectionCore<_, _> = wrap(&numbers.inner).as_collection();
5656

5757
scope.iterative::<u64,_,_>(|nested| {
5858
let summary = Product::new(Default::default(), 1);
5959
let variable = Variable::new_from(numbers.enter(nested), summary);
60-
let mapped: Collection<_, u32, isize, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
60+
let mapped: CollectionCore<_, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
6161
|input, output| {
6262
input.for_each(|time, data| {
6363
let mut session = output.session(&time);

differential-dataflow/src/collection.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,15 @@ use crate::hashable::Hashable;
3737
/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
3838
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
3939
/// defaults to) `isize`, representing changes to the occurrence count of each record.
40+
pub type Collection<G, D, R = isize> = CollectionCore<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
41+
42+
/// A collection represented by a stream of abstract containers.
43+
///
44+
/// The containers purport to reperesent changes to a collection, and they must implement various traits
45+
/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
46+
/// on the containers, and streams of containers, are left to the container implementor to describe.
4047
#[derive(Clone)]
41-
pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
48+
pub struct CollectionCore<G: Scope, C> {
4249
/// The underlying timely dataflow stream.
4350
///
4451
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
@@ -48,11 +55,9 @@ pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Ti
4855
/// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
4956
/// unexpectedly.
5057
pub inner: timely::dataflow::StreamCore<G, C>,
51-
/// Phantom data for unreferenced `D` and `R` types.
52-
phantom: std::marker::PhantomData<(D, R)>,
5358
}
5459

55-
impl<G: Scope, D, R, C> Collection<G, D, R, C> {
60+
impl<G: Scope, C> CollectionCore<G, C> {
5661
/// Creates a new Collection from a timely dataflow stream.
5762
///
5863
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
@@ -62,11 +67,9 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
6267
///
6368
/// This stream should satisfy the timestamp invariant as documented on [Collection]; this
6469
/// method does not check it.
65-
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
66-
Collection { inner: stream, phantom: std::marker::PhantomData }
67-
}
70+
pub fn new(stream: StreamCore<G, C>) -> Self { Self { inner: stream } }
6871
}
69-
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
72+
impl<G: Scope, C: Container> CollectionCore<G, C> {
7073
/// Creates a new collection accumulating the contents of the two collections.
7174
///
7275
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
@@ -128,7 +131,7 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
128131
///
129132
/// This method is a specialization of `enter` to the case where the nested scope is a region.
130133
/// It removes the need for an operator that adjusts the timestamp.
131-
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
134+
pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> CollectionCore<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
132135
self.inner
133136
.enter(child)
134137
.as_collection()
@@ -204,7 +207,7 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
204207
/// .assert_eq(&evens);
205208
/// });
206209
/// ```
207-
pub fn negate(&self) -> Collection<G, D, R, C> where C: containers::Negate {
210+
pub fn negate(&self) -> Self where C: containers::Negate {
208211
use timely::dataflow::channels::pact::Pipeline;
209212
self.inner
210213
.unary(Pipeline, "Negate", move |_,_| move |input, output| {
@@ -233,7 +236,7 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
233236
/// data.assert_eq(&result);
234237
/// });
235238
/// ```
236-
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
239+
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> CollectionCore<Child<'a, G, T>, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
237240
where
238241
C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
239242
T: Refines<<G as ScopeParent>::Timestamp>,
@@ -594,7 +597,7 @@ use timely::dataflow::scopes::ScopeParent;
594597
use timely::progress::timestamp::Refines;
595598

596599
/// Methods requiring a nested scope.
597-
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection<Child<'a, G, T>, D, R, C>
600+
impl<'a, G: Scope, T: Timestamp, C: Container> CollectionCore<Child<'a, G, T>, C>
598601
where
599602
C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
600603
T: Refines<<G as ScopeParent>::Timestamp>,
@@ -619,7 +622,7 @@ where
619622
/// data.assert_eq(&result);
620623
/// });
621624
/// ```
622-
pub fn leave(&self) -> Collection<G, D, R, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
625+
pub fn leave(&self) -> CollectionCore<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
623626
use timely::dataflow::channels::pact::Pipeline;
624627
self.inner
625628
.leave()
@@ -631,13 +634,13 @@ where
631634
}
632635

633636
/// Methods requiring a region as the scope.
634-
impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
637+
impl<G: Scope, C: Container+Data> CollectionCore<Child<'_, G, G::Timestamp>, C>
635638
{
636639
/// Returns the value of a Collection from a nested region to its containing scope.
637640
///
638641
/// This method is a specialization of `leave` to the case that of a nested region.
639642
/// It removes the need for an operator that adjusts the timestamp.
640-
pub fn leave_region(&self) -> Collection<G, D, R, C> {
643+
pub fn leave_region(&self) -> CollectionCore<G, C> {
641644
self.inner
642645
.leave()
643646
.as_collection()
@@ -682,18 +685,18 @@ impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> Collection
682685
}
683686

684687
/// Conversion to a differential dataflow Collection.
685-
pub trait AsCollection<G: Scope, D, R, C> {
688+
pub trait AsCollection<G: Scope, C> {
686689
/// Converts the type to a differential dataflow collection.
687-
fn as_collection(&self) -> Collection<G, D, R, C>;
690+
fn as_collection(&self) -> CollectionCore<G, C>;
688691
}
689692

690-
impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
693+
impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
691694
/// Converts the type to a differential dataflow collection.
692695
///
693696
/// By calling this method, you guarantee that the timestamp invariant (as documented on
694697
/// [Collection]) is upheld. This method will not check it.
695-
fn as_collection(&self) -> Collection<G, D, R, C> {
696-
Collection::<G,D,R,C>::new(self.clone())
698+
fn as_collection(&self) -> CollectionCore<G, C> {
699+
CollectionCore::<G,C>::new(self.clone())
697700
}
698701
}
699702

@@ -718,13 +721,11 @@ impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
718721
/// .assert_eq(&data);
719722
/// });
720723
/// ```
721-
pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
724+
pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> CollectionCore<G, C>
722725
where
723726
G: Scope,
724-
D: Data,
725-
R: Semigroup + 'static,
726727
C: Container,
727-
I: IntoIterator<Item=Collection<G, D, R, C>>,
728+
I: IntoIterator<Item=CollectionCore<G, C>>,
728729
{
729730
scope
730731
.concatenate(iterator.into_iter().map(|x| x.inner))

differential-dataflow/src/operators/iterate.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use timely::dataflow::scopes::child::Iterative;
4242
use timely::dataflow::operators::{Feedback, ConnectLoop};
4343
use timely::dataflow::operators::feedback::Handle;
4444

45-
use crate::{Data, Collection};
45+
use crate::{Data, Collection, collection::CollectionCore};
4646
use crate::difference::{Semigroup, Abelian};
4747
use crate::lattice::Lattice;
4848

@@ -151,20 +151,21 @@ impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Semigroup+'static> Iter
151151
/// });
152152
/// })
153153
/// ```
154-
pub struct Variable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
154+
pub struct Variable<G, C>
155155
where
156156
G: Scope<Timestamp: Lattice>,
157-
D: Data,
158-
R: Abelian + 'static,
159157
C: Container,
160158
{
161-
collection: Collection<G, D, R, C>,
159+
collection: CollectionCore<G, C>,
162160
feedback: Handle<G, C>,
163-
source: Option<Collection<G, D, R, C>>,
161+
source: Option<CollectionCore<G, C>>,
164162
step: <G::Timestamp as Timestamp>::Summary,
165163
}
166164

167-
impl<G, D: Data, R: Abelian, C: Container> Variable<G, D, R, C>
165+
/// A `Variable` specialized to a vector container of update triples (data, time, diff).
166+
pub type VariableRow<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
167+
168+
impl<G, C: Container> Variable<G, C>
168169
where
169170
G: Scope<Timestamp: Lattice>,
170171
C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
@@ -175,22 +176,22 @@ where
175176
/// be used whenever the variable has an empty input.
176177
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
177178
let (feedback, updates) = scope.feedback(step.clone());
178-
let collection = Collection::<G, D, R, C>::new(updates);
179+
let collection = CollectionCore::<G, C>::new(updates);
179180
Self { collection, feedback, source: None, step }
180181
}
181182

182183
/// Creates a new `Variable` from a supplied `source` stream.
183-
pub fn new_from(source: Collection<G, D, R, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
184+
pub fn new_from(source: CollectionCore<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
184185
let (feedback, updates) = source.inner.scope().feedback(step.clone());
185-
let collection = Collection::<G, D, R, C>::new(updates).concat(&source);
186+
let collection = CollectionCore::<G, C>::new(updates).concat(&source);
186187
Variable { collection, feedback, source: Some(source), step }
187188
}
188189

189190
/// Set the definition of the `Variable` to a collection.
190191
///
191192
/// This method binds the `Variable` to be equal to the supplied collection,
192193
/// which may be recursively defined in terms of the variable itself.
193-
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
194+
pub fn set(self, result: &CollectionCore<G, C>) -> CollectionCore<G, C> {
194195
let mut in_result = result.clone();
195196
if let Some(source) = &self.source {
196197
in_result = in_result.concat(&source.negate());
@@ -207,7 +208,7 @@ where
207208
///
208209
/// This behavior can also be achieved by using `new` to create an empty initial
209210
/// collection, and then using `self.set(self.concat(result))`.
210-
pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
211+
pub fn set_concat(self, result: &CollectionCore<G, C>) -> CollectionCore<G, C> {
211212
let step = self.step;
212213
result
213214
.results_in(step)
@@ -218,8 +219,8 @@ where
218219
}
219220
}
220221

221-
impl<G: Scope<Timestamp: Lattice>, D: Data, R: Abelian, C: Container> Deref for Variable<G, D, R, C> {
222-
type Target = Collection<G, D, R, C>;
222+
impl<G: Scope<Timestamp: Lattice>, C: Container> Deref for Variable<G, C> {
223+
type Target = CollectionCore<G, C>;
223224
fn deref(&self) -> &Self::Target {
224225
&self.collection
225226
}
@@ -231,32 +232,30 @@ impl<G: Scope<Timestamp: Lattice>, D: Data, R: Abelian, C: Container> Deref for
231232
/// that do not implement `Abelian` and only implement `Semigroup`. This means
232233
/// that it can be used in settings where the difference type does not support
233234
/// negation.
234-
pub struct SemigroupVariable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
235+
pub struct SemigroupVariable<G, C>
235236
where
236237
G: Scope<Timestamp: Lattice>,
237-
D: Data,
238-
R: Semigroup + 'static,
239238
C: Container,
240239
{
241-
collection: Collection<G, D, R, C>,
240+
collection: CollectionCore<G, C>,
242241
feedback: Handle<G, C>,
243242
step: <G::Timestamp as Timestamp>::Summary,
244243
}
245244

246-
impl<G, D: Data, R: Semigroup, C: Container> SemigroupVariable<G, D, R, C>
245+
impl<G, C: Container> SemigroupVariable<G, C>
247246
where
248247
G: Scope<Timestamp: Lattice>,
249248
C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
250249
{
251250
/// Creates a new initially empty `SemigroupVariable`.
252251
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
253252
let (feedback, updates) = scope.feedback(step.clone());
254-
let collection = Collection::<G,D,R,C>::new(updates);
253+
let collection = CollectionCore::<G,C>::new(updates);
255254
SemigroupVariable { collection, feedback, step }
256255
}
257256

258257
/// Adds a new source of data to `self`.
259-
pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
258+
pub fn set(self, result: &CollectionCore<G, C>) -> CollectionCore<G, C> {
260259
let step = self.step;
261260
result
262261
.results_in(step)
@@ -267,8 +266,8 @@ where
267266
}
268267
}
269268

270-
impl<G: Scope, D: Data, R: Semigroup, C: Container> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
271-
type Target = Collection<G, D, R, C>;
269+
impl<G: Scope, C: Container> Deref for SemigroupVariable<G, C> where G::Timestamp: Lattice {
270+
type Target = CollectionCore<G, C>;
272271
fn deref(&self) -> &Self::Target {
273272
&self.collection
274273
}

doop/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use differential_dataflow::{AsCollection, Collection, Hashable};
1111
use differential_dataflow::ExchangeData as Data;
1212
use differential_dataflow::lattice::Lattice;
1313
use differential_dataflow::input::Input;
14-
use differential_dataflow::operators::iterate::Variable;
14+
use differential_dataflow::operators::iterate::VariableRow;
1515
use differential_dataflow::operators::{Threshold, Join, JoinCore};
1616
use differential_dataflow::operators::arrange::ArrangeByKey;
1717

@@ -56,7 +56,7 @@ type MethodInvocation = Instruction;
5656

5757
/// Set-valued collection.
5858
pub struct Relation<'a, G: Scope, D: Data+Hashable> where G::Timestamp : Lattice {
59-
variable: Variable<Child<'a, G, Iter>, D, Diff>,
59+
variable: VariableRow<Child<'a, G, Iter>, D, Diff>,
6060
current: Collection<Child<'a, G, Iter>, D, Diff>,
6161
}
6262

@@ -68,7 +68,7 @@ impl<'a, G: Scope, D: Data+Hashable> Relation<'a, G, D> where G::Timestamp : Lat
6868
/// Creates a new variable initialized with `source`.
6969
pub fn new_from(source: &Collection<Child<'a, G, Iter>, D, Diff>) -> Self {
7070
use ::timely::order::Product;
71-
let variable = Variable::new_from(source.clone(), Product::new(Default::default(), 1));
71+
let variable = VariableRow::new_from(source.clone(), Product::new(Default::default(), 1));
7272
Relation {
7373
variable: variable,
7474
current: source.clone(),

mdbook/src/chapter_2/chapter_2_7.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ As an example, the implementation of the `iterate` operator looks something like
9494
# {
9595
# (*variable).clone()
9696
# }
97-
# fn example<'a, G: Scope<Timestamp=u64>>(collection: &Collection<G, (u64, u64)>) //, logic: impl Fn(&Variable<Child<'a, G, G::Timestamp>, (u64, u64), isize>) -> Collection<Child<'a, G, G::Timestamp>, (u64, u64)>)
97+
# fn example<'a, G: Scope<Timestamp=u64>>(collection: &Collection<G, (u64, u64)>) //, logic: impl Fn(&VariableRow<Child<'a, G, G::Timestamp>, (u64, u64), isize>) -> Collection<Child<'a, G, G::Timestamp>, (u64, u64)>)
9898
# where G::Timestamp: Lattice
9999
# {
100100
collection.scope().scoped("inner", |subgraph| {

0 commit comments

Comments
 (0)