diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index 725321edf..fa556327d 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -98,7 +98,7 @@ //! }).unwrap(); //! ``` -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, BTreeMap}; use timely::order::{PartialOrder, TotalOrder}; use timely::dataflow::{Scope, Stream}; @@ -216,7 +216,7 @@ where } // Extract upserts available to process as of this `upper`. - let mut to_process = HashMap::new(); + let mut to_process = BTreeMap::new(); while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) { let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty"); to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val))); @@ -229,15 +229,11 @@ where priority_queue.shrink_to_fit(); } - // Put (key, list) into key order, to match cursor enumeration. - let mut to_process = to_process.into_iter().collect::>(); - to_process.sort(); - // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); let mut builder = Bu::new(); - for (key, mut list) in to_process.drain(..) { + for (key, mut list) in to_process { // The prior value associated with the key. let mut prev_value: Option = None;