diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 18e04741496d0..d4a61abc32311 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -115,19 +115,31 @@ pub(super) fn construct( // Encode the contents of each logging stream into its expected `Row` format. let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementBatches); - let arrangement_batches = batches.as_collection().map(move |op| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(op)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); + let arrangement_batches = batches + .as_collection() + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( + Exchange::new(move |_| u64::cast_from(worker_id)), + "PreArrange Differential batches", + ) + .as_collection(move |op, ()| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(worker_id)), + ]) + }); let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementRecords); - let arrangement_records = records.as_collection().map(move |op| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(op)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); + let arrangement_records = records + .as_collection() + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( + Exchange::new(move |_| u64::cast_from(worker_id)), + "PreArrange Differential records", + ) + .as_collection(move |op, ()| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(worker_id)), + ]) + }); let mut packer = PermutedRowPacker::new(DifferentialLog::Sharing); let sharing = sharing @@ -135,14 +147,13 @@ pub(super) fn construct( .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Differential sharing", - ); - - let sharing = sharing.as_collection(move |op, ()| { - packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*op)), - Datum::UInt64(u64::cast_from(worker_id)), - ]) - }); + ) + .as_collection(move |op, ()| { + packer.pack_slice(&[ + Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(worker_id)), + ]) + }); use DifferentialLog::*; let logs = [ @@ -172,8 +183,8 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus /// Bundled output buffers used by the demux operator. struct DemuxOutput<'a, 'b> { - batches: OutputBuffer<'a, 'b, usize>, - records: OutputBuffer<'a, 'b, usize>, + batches: OutputBuffer<'a, 'b, (usize, ())>, + records: OutputBuffer<'a, 'b, (usize, ())>, sharing: OutputBuffer<'a, 'b, (usize, ())>, } @@ -226,10 +237,10 @@ impl DemuxHandler<'_, '_, '_> { fn handle_batch(&mut self, event: BatchEvent) { let ts = self.ts(); let op = event.operator; - self.output.batches.give(self.cap, (op, ts, 1)); + self.output.batches.give(self.cap, ((op, ()), ts, 1)); let diff = Diff::try_from(event.length).expect("must fit"); - self.output.records.give(self.cap, (op, ts, diff)); + self.output.records.give(self.cap, ((op, ()), ts, diff)); self.notify_arrangement_size(op); } @@ -238,12 +249,12 @@ impl DemuxHandler<'_, '_, '_> { let ts = self.ts(); let op = event.operator; - self.output.batches.give(self.cap, (op, ts, -1)); + self.output.batches.give(self.cap, ((op, ()), ts, -1)); let diff = Diff::try_from(done).expect("must fit") - Diff::try_from(event.length1 + event.length2).expect("must fit"); if diff != 0 { - self.output.records.give(self.cap, (op, ts, diff)); + self.output.records.give(self.cap, ((op, ()), ts, diff)); } self.notify_arrangement_size(op); } @@ -251,11 +262,11 @@ impl DemuxHandler<'_, '_, '_> { fn handle_drop(&mut self, event: DropEvent) { let ts = self.ts(); let op = event.operator; - self.output.batches.give(self.cap, (op, ts, -1)); + self.output.batches.give(self.cap, ((op, ()), ts, -1)); let diff = -Diff::try_from(event.length).expect("must fit"); if diff != 0 { - self.output.records.give(self.cap, (op, ts, diff)); + self.output.records.give(self.cap, ((op, ()), ts, diff)); } self.notify_arrangement_size(op); }