Skip to content

Commit d4be2d0

Browse files
committed
Remove use of borrow_as()
1 parent 8df3fd9 commit d4be2d0

File tree

7 files changed

+83
-53
lines changed

7 files changed

+83
-53
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,9 @@ pub mod dd_builder {
638638
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
639639
// and we need to correctly cache last for reasons of correctness, not just performance.
640640

641+
let mut key_con = L::KeyContainer::with_capacity(1);
642+
let mut val_con = L::ValContainer::with_capacity(1);
643+
641644
for ((key,val),time,diff) in chunk.drain() {
642645
// It would be great to avoid.
643646
let key = <layout::Key<L> as Columnar>::into_owned(key);
@@ -646,16 +649,19 @@ pub mod dd_builder {
646649
let time = <layout::Time<L> as Columnar>::into_owned(time);
647650
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
648651

652+
key_con.clear(); key_con.push_own(&key);
653+
val_con.clear(); val_con.push_own(&val);
654+
649655
// Pre-load the first update.
650656
if self.result.keys.is_empty() {
651657
self.result.vals.vals.push_own(&val);
652658
self.result.keys.push_own(&key);
653659
self.staging.push(time, diff);
654660
}
655661
// Perhaps this is a continuation of an already received key.
656-
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
662+
else if self.result.keys.last() == key_con.get(0) {
657663
// Perhaps this is a continuation of an already received value.
658-
if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) {
664+
if self.result.vals.vals.last() == val_con.get(0) {
659665
self.staging.push(time, diff);
660666
} else {
661667
// New value; complete representation of prior value.
@@ -739,20 +745,24 @@ pub mod dd_builder {
739745
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
740746
// and we need to correctly cache last for reasons of correctness, not just performance.
741747

748+
let mut key_con = L::KeyContainer::with_capacity(1);
749+
742750
for ((key,_val),time,diff) in chunk.drain() {
743751
// It would be great to avoid.
744752
let key = <layout::Key<L> as Columnar>::into_owned(key);
745753
// These feel fine (wrt the other versions)
746754
let time = <layout::Time<L> as Columnar>::into_owned(time);
747755
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
748756

757+
key_con.clear(); key_con.push_own(&key);
758+
749759
// Pre-load the first update.
750760
if self.result.keys.is_empty() {
751761
self.result.keys.push_own(&key);
752762
self.staging.push(time, diff);
753763
}
754764
// Perhaps this is a continuation of an already received key.
755-
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
765+
else if self.result.keys.last() == key_con.get(0) {
756766
self.staging.push(time, diff);
757767
} else {
758768
// New key; complete representation of prior key.

differential-dataflow/src/operators/arrange/upsert.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,17 @@ where
233233
// new stuff that we add.
234234
let (mut trace_cursor, trace_storage) = reader_local.cursor();
235235
let mut builder = Bu::new();
236+
let mut key_con = Tr::KeyContainer::with_capacity(1);
236237
for (key, mut list) in to_process {
237238

239+
key_con.clear(); key_con.push_own(&key);
240+
238241
// The prior value associated with the key.
239242
let mut prev_value: Option<Tr::ValOwn> = None;
240243

241244
// Attempt to find the key in the trace.
242-
trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));
243-
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) {
245+
trace_cursor.seek_key(&trace_storage, key_con.index(0));
246+
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) {
244247
// Determine the prior value associated with the key.
245248
while let Some(val) = trace_cursor.get_val(&trace_storage) {
246249
let mut count = 0;

differential-dataflow/src/operators/reduce.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,11 +431,16 @@ where
431431
// We first extract those times from this list that lie in the interval we will process.
432432
sort_dedup(&mut interesting);
433433
// `exposed` contains interesting (key, time)s now below `upper_limit`
434-
let exposed = {
435-
let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time));
436-
interesting = new_interesting;
437-
exposed
438-
};
434+
let mut exposed_keys = T1::KeyContainer::with_capacity(0);
435+
let mut exposed_time = T1::TimeContainer::with_capacity(0);
436+
// Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
437+
interesting.retain(|(key, time)| {
438+
if upper_limit.less_equal(time) { true } else {
439+
exposed_keys.push_own(key);
440+
exposed_time.push_own(time);
441+
false
442+
}
443+
});
439444

440445
// Prepare an output buffer and builder for each capability.
441446
//
@@ -471,12 +476,10 @@ where
471476
// indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
472477
// There could perhaps be a less provocative variable name.
473478
let mut exposed_position = 0;
474-
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {
475-
476-
use std::borrow::Borrow;
479+
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
477480

478481
// Determine the next key we will work on; could be synthetic, could be from a batch.
479-
let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0));
482+
let key1 = exposed_keys.get(exposed_position);
480483
let key2 = batch_cursor.get_key(batch_storage);
481484
let key = match (key1, key2) {
482485
(Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
@@ -492,8 +495,8 @@ where
492495
interesting_times.clear();
493496

494497
// Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
495-
while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) {
496-
interesting_times.push(exposed[exposed_position].1.clone());
498+
while exposed_keys.get(exposed_position) == Some(key) {
499+
interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
497500
exposed_position += 1;
498501
}
499502

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,15 @@ pub mod layers {
207207
///
208208
/// Tracked independently to account for duplicate compression.
209209
total: usize,
210+
211+
/// Time container to stage singleton times for evaluation.
212+
time_con: T,
213+
/// Diff container to stage singleton times for evaluation.
214+
diff_con: D,
210215
}
211216

212217
impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
213-
fn default() -> Self { Self { stash: Vec::default(), total: 0, } }
218+
fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
214219
}
215220

216221

@@ -230,34 +235,28 @@ pub mod layers {
230235
pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
231236
use crate::consolidation;
232237
consolidation::consolidate(&mut self.stash);
233-
if !self.stash.is_empty() {
234-
// If there is a single element, equal to a just-prior recorded update,
235-
// we push nothing and report an unincremented offset to encode this case.
236-
let time_diff = upds.times.last().zip(upds.diffs.last());
237-
let last_eq = self.stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
238-
let t1 = T::borrow_as(t1);
239-
let d1 = D::borrow_as(d1);
240-
t1.eq(&t2) && d1.eq(&d2)
241-
});
242-
if self.stash.len() == 1 && last_eq.unwrap_or(false) {
243-
// Just clear out the stash, as we won't drain it here.
238+
// If everything consolidates away, return false.
239+
if self.stash.is_empty() { return false; }
240+
// If there is a singleton, we may be able to optimize.
241+
if self.stash.len() == 1 {
242+
let (time, diff) = self.stash.last().unwrap();
243+
self.time_con.clear(); self.time_con.push_own(time);
244+
self.diff_con.clear(); self.diff_con.push_own(diff);
245+
if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
244246
self.total += 1;
245247
self.stash.clear();
246248
upds.offs.push_ref(upds.times.len());
249+
return true;
247250
}
248-
else {
249-
// Conventional; move `stash` into `updates`.
250-
self.total += self.stash.len();
251-
for (time, diff) in self.stash.drain(..) {
252-
upds.times.push_own(&time);
253-
upds.diffs.push_own(&diff);
254-
}
255-
upds.offs.push_ref(upds.times.len());
256-
}
257-
true
258-
} else {
259-
false
260251
}
252+
// Conventional; move `stash` into `updates`.
253+
self.total += self.stash.len();
254+
for (time, diff) in self.stash.drain(..) {
255+
upds.times.push_own(&time);
256+
upds.diffs.push_own(&diff);
257+
}
258+
upds.offs.push_ref(upds.times.len());
259+
true
261260
}
262261

263262
/// Completes the building and returns the total updates sealed.

differential-dataflow/src/trace/implementations/rhh.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,13 @@ mod val_batch {
204204
self.key_count += 1;
205205
}
206206

207+
/// Inserts a reference to an owned key, inefficiently. Should be removed.
208+
fn insert_key_own(&mut self, key: &layout::Key<L>, offset: Option<usize>) {
209+
let mut key_con = L::KeyContainer::with_capacity(1);
210+
key_con.push_own(&key);
211+
self.insert_key(key_con.index(0), offset)
212+
}
213+
207214
/// Indicates both the desired location and the hash signature of the key.
208215
fn desired_location<K: Hashable>(&self, key: &K) -> usize {
209216
if self.divisor == 0 { 0 }
@@ -608,9 +615,8 @@ mod val_batch {
608615
// we push nothing and report an unincremented offset to encode this case.
609616
let time_diff = self.result.times.last().zip(self.result.diffs.last());
610617
let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
611-
let t1 = L::TimeContainer::borrow_as(t1);
612-
let d1 = L::DiffContainer::borrow_as(d1);
613-
t1.eq(&t2) && d1.eq(&d2)
618+
// TODO: The use of `into_owned` is a work-around for not having reference types.
619+
*t1 == L::TimeContainer::into_owned(t2) && *d1 == L::DiffContainer::into_owned(d2)
614620
});
615621
if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
616622
// Just clear out update_stash, as we won't drain it here.
@@ -767,9 +773,8 @@ mod val_batch {
767773
/// to recover the singleton to push it into `updates` to join the second update.
768774
fn push_update(&mut self, time: layout::Time<L>, diff: layout::Diff<L>) {
769775
// If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
770-
let t1 = L::TimeContainer::borrow_as(&time);
771-
let d1 = L::DiffContainer::borrow_as(&diff);
772-
if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
776+
// TODO: The use of `into_owned` is a bandage for not having references we can compare.
777+
if self.result.times.last().map(|t| L::TimeContainer::into_owned(t) == time).unwrap_or(false) && self.result.diffs.last().map(|d| L::DiffContainer::into_owned(d) == diff).unwrap_or(false) {
773778
assert!(self.singleton.is_none());
774779
self.singleton = Some((time, diff));
775780
}
@@ -849,7 +854,7 @@ mod val_batch {
849854
self.push_update(time, diff);
850855
self.result.vals.push_into(val);
851856
// Insert the key, but with no specified offset.
852-
self.result.insert_key(L::KeyContainer::borrow_as(&key), None);
857+
self.result.insert_key_own(&key, None);
853858
}
854859
}
855860
}

dogsdogsdogs/src/operators/half_join.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,13 +331,21 @@ where
331331
let (mut cursor, storage) = trace.cursor();
332332
let mut yielded = false;
333333

334+
let mut key_con = Tr::KeyContainer::with_capacity(1);
335+
let mut time_con = Tr::TimeContainer::with_capacity(1);
336+
for time in frontier.iter() {
337+
time_con.push_own(time);
338+
}
339+
334340
// Process proposals one at a time, stopping if we should yield.
335341
for ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
336342
// Use TOTAL ORDER to allow the release of `time`.
337343
yielded = yielded || yield_function(timer, *work);
338-
if !yielded && !frontier.iter().any(|t| comparison(Tr::TimeContainer::borrow_as(t), initial)) {
339-
cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(key));
340-
if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(key)) {
344+
345+
if !yielded && !(0 .. time_con.len()).any(|i| comparison(time_con.index(i), initial)) {
346+
key_con.clear(); key_con.push_own(&key);
347+
cursor.seek_key(&storage, key_con.index(0));
348+
if cursor.get_key(&storage) == key_con.get(0) {
341349
while let Some(val2) = cursor.get_val(&storage) {
342350
cursor.map_times(&storage, |t, d| {
343351
if comparison(t, initial) {

dogsdogsdogs/src/operators/lookup_map.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,14 @@ where
8989
});
9090

9191
let (mut cursor, storage) = trace.cursor();
92-
92+
// Key container to stage keys for comparison.
93+
let mut key_con = Tr::KeyContainer::with_capacity(1);
9394
for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() {
9495
if !input2.frontier.less_equal(time) {
9596
logic2(prefix, &mut key1);
96-
cursor.seek_key(&storage, Tr::KeyContainer::borrow_as(&key1));
97-
if cursor.get_key(&storage) == Some(Tr::KeyContainer::borrow_as(&key1)) {
97+
key_con.clear(); key_con.push_own(&key1);
98+
cursor.seek_key(&storage, key_con.index(1));
99+
if cursor.get_key(&storage) == Some(key_con.index(1)) {
98100
while let Some(value) = cursor.get_val(&storage) {
99101
let mut count = Tr::Diff::zero();
100102
cursor.map_times(&storage, |t, d| {

0 commit comments

Comments
 (0)