Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,34 +115,45 @@ pub(super) fn construct<A: Allocate>(

// Encode the contents of each logging stream into its expected `Row` format.
let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementBatches);
let arrangement_batches = batches.as_collection().map(move |op| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});
let arrangement_batches = batches
.as_collection()
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
Exchange::new(move |_| u64::cast_from(worker_id)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that this matters much, but the Exchanges at this position are merely defensive, right? In the previous version of the code, the data were just pipelined, so using a Pipeline pact here would also be an option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, and thinking about it I'm not sure it makes a lot of sense! But I don't want to change it as part of this PR because there are other places that use the same construct, and it's better to change all at the same time.

The exchange would go away if/once #22384 lands because then we make it an invariant that log data gets processed locally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this is where we decided to keep these exchanges: #18062 (comment).

But I also find Pipeline more readable here, as it conveys what invariants we believe to be true. When seeing the exchange without an explanation you might start to doubt your understanding of how events flow through these dataflows.

"PreArrange Differential batches",
)
.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});
let mut packer = PermutedRowPacker::new(DifferentialLog::ArrangementRecords);
let arrangement_records = records.as_collection().map(move |op| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});
let arrangement_records = records
.as_collection()
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
Exchange::new(move |_| u64::cast_from(worker_id)),
"PreArrange Differential records",
)
.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});

let mut packer = PermutedRowPacker::new(DifferentialLog::Sharing);
let sharing = sharing
.as_collection()
.mz_arrange_core::<_, RowSpine<_, _, _, _>>(
Exchange::new(move |_| u64::cast_from(worker_id)),
"PreArrange Differential sharing",
);

let sharing = sharing.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});
)
.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
});

use DifferentialLog::*;
let logs = [
Expand Down Expand Up @@ -172,8 +183,8 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus

/// Bundled output buffers used by the demux operator.
struct DemuxOutput<'a, 'b> {
batches: OutputBuffer<'a, 'b, usize>,
records: OutputBuffer<'a, 'b, usize>,
batches: OutputBuffer<'a, 'b, (usize, ())>,
records: OutputBuffer<'a, 'b, (usize, ())>,
sharing: OutputBuffer<'a, 'b, (usize, ())>,
}

Expand Down Expand Up @@ -226,10 +237,10 @@ impl DemuxHandler<'_, '_, '_> {
fn handle_batch(&mut self, event: BatchEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, (op, ts, 1));
self.output.batches.give(self.cap, ((op, ()), ts, 1));

let diff = Diff::try_from(event.length).expect("must fit");
self.output.records.give(self.cap, (op, ts, diff));
self.output.records.give(self.cap, ((op, ()), ts, diff));
self.notify_arrangement_size(op);
}

Expand All @@ -238,24 +249,24 @@ impl DemuxHandler<'_, '_, '_> {

let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, (op, ts, -1));
self.output.batches.give(self.cap, ((op, ()), ts, -1));

let diff = Diff::try_from(done).expect("must fit")
- Diff::try_from(event.length1 + event.length2).expect("must fit");
if diff != 0 {
self.output.records.give(self.cap, (op, ts, diff));
self.output.records.give(self.cap, ((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}

fn handle_drop(&mut self, event: DropEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, (op, ts, -1));
self.output.batches.give(self.cap, ((op, ()), ts, -1));

let diff = -Diff::try_from(event.length).expect("must fit");
if diff != 0 {
self.output.records.give(self.cap, (op, ts, diff));
self.output.records.give(self.cap, ((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}
Expand Down