@@ -115,33 +115,46 @@ pub(super) fn construct<A: Allocate>(
115115 } ) ;
116116
117117 // Encode the contents of each logging stream into its expected `Row` format.
118- let arrangement_batches = batches. as_collection ( ) . map ( move |op| {
119- Row :: pack_slice ( & [
120- Datum :: UInt64 ( u64:: cast_from ( op) ) ,
121- Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
122- ] )
123- } ) ;
124- let arrangement_records = records. as_collection ( ) . map ( move |op| {
125- Row :: pack_slice ( & [
126- Datum :: UInt64 ( u64:: cast_from ( op) ) ,
127- Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
128- ] )
129- } ) ;
118+ let arrangement_batches = batches
119+ . as_collection ( )
120+ . mz_arrange_core :: < _ , RowSpine < _ , _ , _ , _ > > (
121+ Exchange :: new ( move |_| u64:: cast_from ( worker_id) ) ,
122+ "PreArrange Differential batches" ,
123+ compute_state. enable_arrangement_size_logging ,
124+ )
125+ . as_collection ( move |op, ( ) | {
126+ Row :: pack_slice ( & [
127+ Datum :: UInt64 ( u64:: cast_from ( * op) ) ,
128+ Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
129+ ] )
130+ } ) ;
131+ let arrangement_records = records
132+ . as_collection ( )
133+ . mz_arrange_core :: < _ , RowSpine < _ , _ , _ , _ > > (
134+ Exchange :: new ( move |_| u64:: cast_from ( worker_id) ) ,
135+ "PreArrange Differential records" ,
136+ compute_state. enable_arrangement_size_logging ,
137+ )
138+ . as_collection ( move |op, ( ) | {
139+ Row :: pack_slice ( & [
140+ Datum :: UInt64 ( u64:: cast_from ( * op) ) ,
141+ Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
142+ ] )
143+ } ) ;
130144
131145 let sharing = sharing
132146 . as_collection ( )
133147 . mz_arrange_core :: < _ , RowSpine < _ , _ , _ , _ > > (
134148 Exchange :: new ( move |_| u64:: cast_from ( worker_id) ) ,
135149 "PreArrange Differential sharing" ,
136150 compute_state. enable_arrangement_size_logging ,
137- ) ;
138-
139- let sharing = sharing. as_collection ( move |op, ( ) | {
140- Row :: pack_slice ( & [
141- Datum :: UInt64 ( u64:: cast_from ( * op) ) ,
142- Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
143- ] )
144- } ) ;
151+ )
152+ . as_collection ( move |op, ( ) | {
153+ Row :: pack_slice ( & [
154+ Datum :: UInt64 ( u64:: cast_from ( * op) ) ,
155+ Datum :: UInt64 ( u64:: cast_from ( worker_id) ) ,
156+ ] )
157+ } ) ;
145158
146159 use DifferentialLog :: * ;
147160 let logs = [
@@ -194,8 +207,8 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus
194207
195208/// Bundled output buffers used by the demux operator.
196209struct DemuxOutput < ' a , ' b > {
197- batches : OutputBuffer < ' a , ' b , usize > ,
198- records : OutputBuffer < ' a , ' b , usize > ,
210+ batches : OutputBuffer < ' a , ' b , ( usize , ( ) ) > ,
211+ records : OutputBuffer < ' a , ' b , ( usize , ( ) ) > ,
199212 sharing : OutputBuffer < ' a , ' b , ( usize , ( ) ) > ,
200213}
201214
@@ -248,10 +261,10 @@ impl DemuxHandler<'_, '_, '_> {
248261 fn handle_batch ( & mut self , event : BatchEvent ) {
249262 let ts = self . ts ( ) ;
250263 let op = event. operator ;
251- self . output . batches . give ( self . cap , ( op , ts, 1 ) ) ;
264+ self . output . batches . give ( self . cap , ( ( op , ( ) ) , ts, 1 ) ) ;
252265
253266 let diff = Diff :: try_from ( event. length ) . expect ( "must fit" ) ;
254- self . output . records . give ( self . cap , ( op , ts, diff) ) ;
267+ self . output . records . give ( self . cap , ( ( op , ( ) ) , ts, diff) ) ;
255268 self . notify_arrangement_size ( op) ;
256269 }
257270
@@ -260,24 +273,24 @@ impl DemuxHandler<'_, '_, '_> {
260273
261274 let ts = self . ts ( ) ;
262275 let op = event. operator ;
263- self . output . batches . give ( self . cap , ( op , ts, -1 ) ) ;
276+ self . output . batches . give ( self . cap , ( ( op , ( ) ) , ts, -1 ) ) ;
264277
265278 let diff = Diff :: try_from ( done) . expect ( "must fit" )
266279 - Diff :: try_from ( event. length1 + event. length2 ) . expect ( "must fit" ) ;
267280 if diff != 0 {
268- self . output . records . give ( self . cap , ( op , ts, diff) ) ;
281+ self . output . records . give ( self . cap , ( ( op , ( ) ) , ts, diff) ) ;
269282 }
270283 self . notify_arrangement_size ( op) ;
271284 }
272285
273286 fn handle_drop ( & mut self , event : DropEvent ) {
274287 let ts = self . ts ( ) ;
275288 let op = event. operator ;
276- self . output . batches . give ( self . cap , ( op , ts, -1 ) ) ;
289+ self . output . batches . give ( self . cap , ( ( op , ( ) ) , ts, -1 ) ) ;
277290
278291 let diff = -Diff :: try_from ( event. length ) . expect ( "must fit" ) ;
279292 if diff != 0 {
280- self . output . records . give ( self . cap , ( op , ts, diff) ) ;
293+ self . output . records . give ( self . cap , ( ( op , ( ) ) , ts, diff) ) ;
281294 }
282295 self . notify_arrangement_size ( op) ;
283296 }
0 commit comments