Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::controller::StorageError;
use mz_transform::dataflow::DataflowMetainfo;
use smallvec::SmallVec;
use timely::PartialOrder;
use timely::progress::Antichain;
use tokio::sync::{oneshot, watch};
use tracing::{Instrument, Span, info, warn};
Expand Down Expand Up @@ -3406,6 +3407,10 @@ impl Coordinator {
&**write_frontier
);

// For `ALTER SINK`, the snapshot should only occur if the sink has not made any progress.
// This prevents unnecessary decoding in the sink.
let alter_sink_snapshot = with_snapshot && !PartialOrder::less_than(&as_of, write_frontier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newbie question: would LEQ instead of strict less_than not be right here? Is it possible for the as_of and write_frontier to be exactly equal? In that case is it possible that there is actually progress being made?

Copy link
Contributor Author

@martykulma martykulma Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The as_of is where we are reading from, and the upper is where the next write will happen. If the write frontier hasn't progressed strictly beyond the as_of, it would mean that nothing has been written (or we are not done writing).

For a simple example, consider the case where as_of is the minimum, e.g. [0], and the upper is also the minimum [0]. In this case, I need to write the snapshot, which happens at [0], and the upper can progress (lets say it is now [1]). At this point, we know that all data associated with times less than, but not equal to, [1] are written.


// Parse the `create_sql` so we can update it to the new sink definition.
//
// Note that we need to use the `create_sql` from the catalog here, not the one from the
Expand Down Expand Up @@ -3484,7 +3489,7 @@ impl Coordinator {
.into_inline_connection(self.catalog().state()),
envelope: sink_plan.envelope,
as_of,
with_snapshot,
with_snapshot: alter_sink_snapshot,
version: sink_plan.version,
from_storage_metadata: (),
to_storage_metadata: (),
Expand Down