@@ -89,6 +89,7 @@ use mz_storage_types::connections::inline::IntoInlineConnection;
8989use mz_storage_types:: controller:: StorageError ;
9090use mz_transform:: dataflow:: DataflowMetainfo ;
9191use smallvec:: SmallVec ;
92+ use timely:: PartialOrder ;
9293use timely:: progress:: Antichain ;
9394use tokio:: sync:: { oneshot, watch} ;
9495use tracing:: { Instrument , Span , info, warn} ;
@@ -3406,6 +3407,10 @@ impl Coordinator {
34063407 & * * write_frontier
34073408 ) ;
34083409
3410+ // For `ALTER SINK`, the snapshot should only occur if the sink has not made any progress.
3411+ // This prevents unnecessary decoding in the sink.
3412+ let alter_sink_snapshot = with_snapshot && !PartialOrder :: less_than ( & as_of, write_frontier) ;
3413+
34093414 // Parse the `create_sql` so we can update it to the new sink definition.
34103415 //
34113416 // Note that we need to use the `create_sql` from the catalog here, not the one from the
@@ -3484,7 +3489,7 @@ impl Coordinator {
34843489 . into_inline_connection ( self . catalog ( ) . state ( ) ) ,
34853490 envelope : sink_plan. envelope ,
34863491 as_of,
3487- with_snapshot,
3492+ with_snapshot : alter_sink_snapshot ,
34883493 version : sink_plan. version ,
34893494 from_storage_metadata : ( ) ,
34903495 to_storage_metadata : ( ) ,
0 commit comments