@@ -794,12 +794,15 @@ mod write {
794794 /// The frontiers of the `desired` inputs.
795795 desired_frontiers : OkErr < Antichain < Timestamp > , Antichain < Timestamp > > ,
796796 /// The frontiers of the `persist` inputs.
797+ ///
798+ /// Note that this is _not_ the same as the write frontier of the output persist shard! It
799+ /// usually is, but during snapshot processing, these frontiers will start at the shard's
800+ /// read frontier, so they can be beyond its write frontier. This is important as it means
801+ /// we must not discard batch descriptions based on these persist frontiers: A batch
802+ /// description might still be valid even if its `lower` is before the persist frontiers we
803+ /// observe.
797804 persist_frontiers : OkErr < Antichain < Timestamp > , Antichain < Timestamp > > ,
798805 /// The current valid batch description and associated output capability, if any.
799- ///
800- /// Note that "valid" here implies that if a batch description is set, it must be true that
801- /// its `lower` is >= the `persist_frontier`. Otherwise the described batch couldn't be
802- /// appended anymore, rendering the batch description invalid.
803806 batch_description : Option < ( BatchDescription , Capability < Timestamp > ) > ,
804807 /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
805808 /// `persist_frontiers` become greater than the given frontier.
@@ -894,15 +897,6 @@ mod write {
894897 self . corrections . ok . advance_since ( frontier. clone ( ) ) ;
895898 self . corrections . err . advance_since ( frontier. clone ( ) ) ;
896899
897- // If the `persist` frontier is greater than the `lower` of the current batch
898- // description, we won't be able to append the batch, so the batch description is not
899- // valid anymore.
900- if let Some ( ( desc, _) ) = & self . batch_description {
901- if PartialOrder :: less_than ( & desc. lower , frontier) {
902- self . batch_description = None ;
903- }
904- }
905-
906900 self . maybe_force_consolidation ( ) ;
907901 }
908902
@@ -927,16 +921,18 @@ mod write {
927921 }
928922
929923 fn absorb_batch_description ( & mut self , desc : BatchDescription , cap : Capability < Timestamp > ) {
930- // The incoming batch description is outdated if either:
931- // * we already have a batch description with a greater `lower`, or
932- // * its `lower` is less than the persist frontier
933- let validity_frontier = match & self . batch_description {
934- Some ( ( prev, _) ) => & prev. lower ,
935- None => self . persist_frontiers . frontier ( ) ,
936- } ;
937- if PartialOrder :: less_than ( & desc. lower , validity_frontier) {
938- self . trace ( format ! ( "skipping outdated batch description: {desc:?}" ) ) ;
939- return ;
924+ // The incoming batch description is outdated if we already have a batch description
925+ // with a greater `lower`.
926+ //
927+ // Note that we cannot assume a description is outdated based on the comparison of its
928+ // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
929+ // operator is initialized with the shard's read frontier, so it can be greater than
930+ // the shard's write frontier.
931+ if let Some ( ( prev, _) ) = & self . batch_description {
932+ if PartialOrder :: less_than ( & desc. lower , & prev. lower ) {
933+ self . trace ( format ! ( "skipping outdated batch description: {desc:?}" ) ) ;
934+ return ;
935+ }
940936 }
941937
942938 self . batch_description = Some ( ( desc, cap) ) ;
@@ -948,7 +944,8 @@ mod write {
948944
949945 // We can write a new batch if we have seen all `persist` updates before `lower` and
950946 // all `desired` updates up to `upper`.
951- let persist_complete = desc. lower == * self . persist_frontiers . frontier ( ) ;
947+ let persist_complete =
948+ PartialOrder :: less_equal ( & desc. lower , self . persist_frontiers . frontier ( ) ) ;
952949 let desired_complete =
953950 PartialOrder :: less_equal ( & desc. upper , self . desired_frontiers . frontier ( ) ) ;
954951 if !persist_complete || !desired_complete {
@@ -957,9 +954,6 @@ mod write {
957954
958955 let ( desc, cap) = self . batch_description . take ( ) ?;
959956
960- assert_eq ! ( desc. lower, * self . corrections. ok. since( ) ) ;
961- assert_eq ! ( desc. lower, * self . corrections. err. since( ) ) ;
962-
963957 let ok_updates = self . corrections . ok . updates_before ( & desc. upper ) ;
964958 let err_updates = self . corrections . err . updates_before ( & desc. upper ) ;
965959
0 commit comments