Skip to content

Commit 3128ce9

Browse files
committed
storage: in feedback upsert, only process relevant updates/commands
1 parent 5215d48 commit 3128ce9

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

src/storage/src/upsert_continual_feedback.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,29 @@ where
647647

648648
eligible
649649
})
650+
.filter(|(ts, _, _, _)| {
651+
let persist_upper = match &drain_style {
652+
DrainStyle::ToUpper {
653+
input_upper: _,
654+
persist_upper,
655+
} => persist_upper,
656+
DrainStyle::AtTime {
657+
time: _,
658+
persist_upper,
659+
} => persist_upper,
660+
};
661+
662+
// Any update that is "in the past" of the persist upper is not
663+
// relevant anymore. We _can_ emit changes for it, but the
664+
// downstream persist_sink would filter these updates out because
665+
// the shard upper is already further ahead.
666+
//
667+
// Plus, our upsert state is up-to-date to the persist_upper, so we
668+
// wouldn't be able to emit correct retractions for incoming
669+
// commands whose `ts` is in the past of that.
670+
let relevant = persist_upper.less_equal(ts);
671+
relevant
672+
})
650673
.collect_vec();
651674

652675
tracing::debug!(

0 commit comments

Comments
 (0)