Skip to content

Commit ff77df5

Browse files
authored
Merge pull request #22330 from antiguru/logging_prearrange
2 parents e61484d + ec8abd0 commit ff77df5

File tree

1 file changed

+39
-28
lines changed

1 file changed

+39
-28
lines changed

src/compute/src/logging/differential.rs

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -115,34 +115,45 @@ pub(super) fn construct<A: Allocate>(
115115

116116
// Encode the contents of each logging stream into its expected `Row` format.
117117
let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementBatches);
118-
let arrangement_batches = batches.as_collection().map(move |op| {
119-
packer.pack_slice(&[
120-
Datum::UInt64(u64::cast_from(op)),
121-
Datum::UInt64(u64::cast_from(worker_id)),
122-
])
123-
});
118+
let arrangement_batches = batches
119+
.as_collection()
120+
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
121+
Exchange::new(move |_| u64::cast_from(worker_id)),
122+
"PreArrange Differential batches",
123+
)
124+
.as_collection(move |op, ()| {
125+
packer.pack_slice(&[
126+
Datum::UInt64(u64::cast_from(*op)),
127+
Datum::UInt64(u64::cast_from(worker_id)),
128+
])
129+
});
124130
let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementRecords);
125-
let arrangement_records = records.as_collection().map(move |op| {
126-
packer.pack_slice(&[
127-
Datum::UInt64(u64::cast_from(op)),
128-
Datum::UInt64(u64::cast_from(worker_id)),
129-
])
130-
});
131+
let arrangement_records = records
132+
.as_collection()
133+
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
134+
Exchange::new(move |_| u64::cast_from(worker_id)),
135+
"PreArrange Differential records",
136+
)
137+
.as_collection(move |op, ()| {
138+
packer.pack_slice(&[
139+
Datum::UInt64(u64::cast_from(*op)),
140+
Datum::UInt64(u64::cast_from(worker_id)),
141+
])
142+
});
131143

132144
let mut packer = PermutedRowPacker::new(DifferentialLog::Sharing);
133145
let sharing = sharing
134146
.as_collection()
135147
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
136148
Exchange::new(move |_| u64::cast_from(worker_id)),
137149
"PreArrange Differential sharing",
138-
);
139-
140-
let sharing = sharing.as_collection(move |op, ()| {
141-
packer.pack_slice(&[
142-
Datum::UInt64(u64::cast_from(*op)),
143-
Datum::UInt64(u64::cast_from(worker_id)),
144-
])
145-
});
150+
)
151+
.as_collection(move |op, ()| {
152+
packer.pack_slice(&[
153+
Datum::UInt64(u64::cast_from(*op)),
154+
Datum::UInt64(u64::cast_from(worker_id)),
155+
])
156+
});
146157

147158
use DifferentialLog::*;
148159
let logs = [
@@ -172,8 +183,8 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus
172183

173184
/// Bundled output buffers used by the demux operator.
174185
struct DemuxOutput<'a, 'b> {
175-
batches: OutputBuffer<'a, 'b, usize>,
176-
records: OutputBuffer<'a, 'b, usize>,
186+
batches: OutputBuffer<'a, 'b, (usize, ())>,
187+
records: OutputBuffer<'a, 'b, (usize, ())>,
177188
sharing: OutputBuffer<'a, 'b, (usize, ())>,
178189
}
179190

@@ -226,10 +237,10 @@ impl DemuxHandler<'_, '_, '_> {
226237
fn handle_batch(&mut self, event: BatchEvent) {
227238
let ts = self.ts();
228239
let op = event.operator;
229-
self.output.batches.give(self.cap, (op, ts, 1));
240+
self.output.batches.give(self.cap, ((op, ()), ts, 1));
230241

231242
let diff = Diff::try_from(event.length).expect("must fit");
232-
self.output.records.give(self.cap, (op, ts, diff));
243+
self.output.records.give(self.cap, ((op, ()), ts, diff));
233244
self.notify_arrangement_size(op);
234245
}
235246

@@ -238,24 +249,24 @@ impl DemuxHandler<'_, '_, '_> {
238249

239250
let ts = self.ts();
240251
let op = event.operator;
241-
self.output.batches.give(self.cap, (op, ts, -1));
252+
self.output.batches.give(self.cap, ((op, ()), ts, -1));
242253

243254
let diff = Diff::try_from(done).expect("must fit")
244255
- Diff::try_from(event.length1 + event.length2).expect("must fit");
245256
if diff != 0 {
246-
self.output.records.give(self.cap, (op, ts, diff));
257+
self.output.records.give(self.cap, ((op, ()), ts, diff));
247258
}
248259
self.notify_arrangement_size(op);
249260
}
250261

251262
fn handle_drop(&mut self, event: DropEvent) {
252263
let ts = self.ts();
253264
let op = event.operator;
254-
self.output.batches.give(self.cap, (op, ts, -1));
265+
self.output.batches.give(self.cap, ((op, ()), ts, -1));
255266

256267
let diff = -Diff::try_from(event.length).expect("must fit");
257268
if diff != 0 {
258-
self.output.records.give(self.cap, (op, ts, diff));
269+
self.output.records.give(self.cap, ((op, ()), ts, diff));
259270
}
260271
self.notify_arrangement_size(op);
261272
}

0 commit comments

Comments
 (0)