5959//! use differential_dataflow::operators::arrange::upsert;
6060//!
6161//! let stream = scope.input_from(&mut input);
62- //! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
62+ //! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
6363//!
6464//! arranged
6565//! .as_collection(|k,v| (k.clone(), v.clone()))
@@ -127,21 +127,19 @@ use super::TraceAgent;
127127/// This method is only implemented for totally ordered times, as we do not yet
128128/// understand what a "sequence" of upserts would mean for partially ordered
129129/// timestamps.
130- pub fn arrange_from_upsert < G , K , V , Bu , Tr > (
131- stream : & Stream < G , ( K , Option < V > , G :: Timestamp ) > ,
130+ pub fn arrange_from_upsert < G , Bu , Tr > (
131+ stream : & Stream < G , ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) > ,
132132 name : & str ,
133133) -> Arranged < G , TraceAgent < Tr > >
134134where
135135 G : Scope < Timestamp =Tr :: Time > ,
136136 Tr : for < ' a > Trace <
137- KeyOwn = K ,
138- ValOwn = V ,
137+ KeyOwn : ExchangeData + Hashable +std :: hash :: Hash ,
138+ ValOwn : ExchangeData ,
139139 Time : TotalOrder +ExchangeData ,
140140 Diff =isize ,
141141 > +' static ,
142- K : ExchangeData +Hashable +std:: hash:: Hash ,
143- V : ExchangeData ,
144- Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( K , V ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
142+ Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( Tr :: KeyOwn , Tr :: ValOwn ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
145143{
146144 let mut reader: Option < TraceAgent < Tr > > = None ;
147145
@@ -150,7 +148,7 @@ where
150148
151149 let reader = & mut reader;
152150
153- let exchange = Exchange :: new ( move |update : & ( K , Option < V > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
151+ let exchange = Exchange :: new ( move |update : & ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
154152
155153 stream. unary_frontier ( exchange, name, move |_capability, info| {
156154
@@ -175,7 +173,7 @@ where
175173 let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
176174
177175 // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
178- let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , K , Option < V > ) > > :: new ( ) ;
176+ let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , Tr :: KeyOwn , Option < Tr :: ValOwn > ) > > :: new ( ) ;
179177 let mut updates = Vec :: new ( ) ;
180178
181179 move |input, output| {
@@ -238,7 +236,7 @@ where
238236 for ( key, mut list) in to_process {
239237
240238 // The prior value associated with the key.
241- let mut prev_value: Option < V > = None ;
239+ let mut prev_value: Option < Tr :: ValOwn > = None ;
242240
243241 // Attempt to find the key in the trace.
244242 trace_cursor. seek_key ( & trace_storage, Tr :: KeyContainer :: borrow_as ( & key) ) ;
0 commit comments