@@ -33,6 +33,7 @@ use timely::dataflow::channels::pact::Exchange;
3333use timely:: dataflow:: operators:: { Capability , CapabilitySet } ;
3434use timely:: dataflow:: { Scope , Stream } ;
3535use timely:: order:: { PartialOrder , TotalOrder } ;
36+ use timely:: progress:: timestamp:: Refines ;
3637use timely:: progress:: { Antichain , Timestamp } ;
3738
3839use crate :: healthcheck:: HealthStatusUpdate ;
@@ -121,7 +122,7 @@ pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
121122 PressOnDropButton ,
122123)
123124where
124- G :: Timestamp : TotalOrder + Sync ,
125+ G :: Timestamp : Refines < mz_repr :: Timestamp > + TotalOrder + Sync ,
125126 F : FnOnce ( ) -> Fut + ' static ,
126127 Fut : std:: future:: Future < Output = US > ,
127128 US : UpsertStateBackend < G :: Timestamp , Option < FromTime > > ,
@@ -266,6 +267,7 @@ where
266267 worker_id = %source_config. worker_id,
267268 source_id = %source_config. id,
268269 persist_stash = %persist_stash. len( ) ,
270+ %hydrating,
269271 %last_rehydration_chunk,
270272 ?resume_upper,
271273 ?persist_upper,
@@ -404,7 +406,10 @@ where
404406 // is in an inconsistent/consolidating state and accessing it would
405407 // panic.
406408 if let Some ( largest_seen_persist_ts) = largest_seen_persist_ts. as_ref ( ) {
407- if persist_upper. less_equal ( largest_seen_persist_ts) {
409+ let largest_seen_outer_persist_ts = largest_seen_persist_ts. clone ( ) . to_outer ( ) ;
410+ let outer_persist_upper = persist_upper. iter ( ) . map ( |ts| ts. clone ( ) . to_outer ( ) ) ;
411+ let outer_persist_upper = Antichain :: from_iter ( outer_persist_upper) ;
412+ if outer_persist_upper. less_equal ( & largest_seen_outer_persist_ts) {
408413 continue ;
409414 }
410415 }
@@ -641,6 +646,29 @@ where
641646
642647 eligible
643648 } )
649+ . filter ( |( ts, _, _, _) | {
650+ let persist_upper = match & drain_style {
651+ DrainStyle :: ToUpper {
652+ input_upper : _,
653+ persist_upper,
654+ } => persist_upper,
655+ DrainStyle :: AtTime {
656+ time : _,
657+ persist_upper,
658+ } => persist_upper,
659+ } ;
660+
661+ // Any update that is "in the past" of the persist upper is not
662+ // relevant anymore. We _can_ emit changes for it, but the
663+ // downstream persist_sink would filter these updates out because
664+ // the shard upper is already further ahead.
665+ //
666+ // Plus, our upsert state is up-to-date to the persist_upper, so we
667+ // wouldn't be able to emit correct retractions for incoming
668+ // commands whose `ts` is in the past of that.
669+ let relevant = persist_upper. less_equal ( ts) ;
670+ relevant
671+ } )
644672 . collect_vec ( ) ;
645673
646674 tracing:: debug!(
@@ -733,22 +761,18 @@ where
733761
734762 match value {
735763 Some ( value) => {
736- let existing_value = existing_state_cell. take ( ) ;
737-
738- let old_value = if let Some ( old_value) = existing_value. as_ref ( ) {
739- old_value. provisional_value_ref ( & ts)
740- } else {
741- None
742- } ;
743-
744- if let Some ( old_value) = old_value {
745- output_updates. push ( ( old_value. clone ( ) , ts. clone ( ) , -1 ) ) ;
764+ if let Some ( old_value) = existing_state_cell. as_ref ( ) {
765+ if let Some ( old_value) = old_value. provisional_value_ref ( & ts) {
766+ output_updates. push ( ( old_value. clone ( ) , ts. clone ( ) , -1 ) ) ;
767+ }
746768 }
747769
748770 match & drain_style {
749771 DrainStyle :: AtTime { .. } => {
772+ let existing_value = existing_state_cell. take ( ) ;
773+
750774 let new_value = match existing_value {
751- Some ( existing_value) => existing_value. into_provisional_value (
775+ Some ( existing_value) => existing_value. clone ( ) . into_provisional_value (
752776 value. clone ( ) ,
753777 ts. clone ( ) ,
754778 Some ( from_time. 0 . clone ( ) ) ,
@@ -759,6 +783,7 @@ where
759783 Some ( from_time. 0 . clone ( ) ) ,
760784 ) ,
761785 } ;
786+
762787 existing_state_cell. replace ( new_value) ;
763788 }
764789 DrainStyle :: ToUpper { .. } => {
@@ -769,19 +794,16 @@ where
769794 output_updates. push ( ( value, ts, 1 ) ) ;
770795 }
771796 None => {
772- let existing_value = existing_state_cell. take ( ) ;
773-
774- let old_value = if let Some ( old_value) = existing_value. as_ref ( ) {
775- old_value. provisional_value_ref ( & ts)
776- } else {
777- None
778- } ;
779- if let Some ( old_value) = old_value {
780- output_updates. push ( ( old_value. clone ( ) , ts. clone ( ) , -1 ) ) ;
797+ if let Some ( old_value) = existing_state_cell. as_ref ( ) {
798+ if let Some ( old_value) = old_value. provisional_value_ref ( & ts) {
799+ output_updates. push ( ( old_value. clone ( ) , ts. clone ( ) , -1 ) ) ;
800+ }
781801 }
782802
783803 match & drain_style {
784804 DrainStyle :: AtTime { .. } => {
805+ let existing_value = existing_state_cell. take ( ) ;
806+
785807 let new_value = match existing_value {
786808 Some ( existing_value) => existing_value
787809 . into_provisional_tombstone ( ts. clone ( ) , Some ( from_time. 0 . clone ( ) ) ) ,
@@ -790,6 +812,7 @@ where
790812 Some ( from_time. 0 . clone ( ) ) ,
791813 ) ,
792814 } ;
815+
793816 existing_state_cell. replace ( new_value) ;
794817 }
795818 DrainStyle :: ToUpper { .. } => {
0 commit comments