Skip to content

Commit f932a69

Browse files
authored
Merge pull request #30989 from teskje/fix-mv-sink-persist-frontier
mv-sink: stop discarding valid batch descriptions
2 parents 9b0c0bb + 0886c94 commit f932a69

File tree

3 files changed

+64
-32
lines changed

3 files changed

+64
-32
lines changed

src/compute/src/sink/correction.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,6 @@ impl<D: Data> Correction<D> {
200200
count
201201
}
202202

203-
/// Return the current since frontier.
204-
pub fn since(&self) -> &Antichain<Timestamp> {
205-
&self.since
206-
}
207-
208203
/// Advance the since frontier.
209204
///
210205
/// # Panics

src/compute/src/sink/materialized_view_v2.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -794,12 +794,15 @@ mod write {
794794
/// The frontiers of the `desired` inputs.
795795
desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
796796
/// The frontiers of the `persist` inputs.
797+
///
798+
/// Note that this is _not_ the same as the write frontier of the output persist shard! It
799+
/// usually is, but during snapshot processing, these frontiers will start at the shard's
800+
/// read frontier, so they can be beyond its write frontier. This is important as it means
801+
/// we must not discard batch descriptions based on these persist frontiers: A batch
802+
/// description might still be valid even if its `lower` is before the persist frontiers we
803+
/// observe.
797804
persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
798805
/// The current valid batch description and associated output capability, if any.
799-
///
800-
/// Note that "valid" here implies that if a batch description is set, it must be true that
801-
/// its `lower` is >= the `persist_frontier`. Otherwise the described batch couldn't be
802-
/// appended anymore, rendering the batch description invalid.
803806
batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
804807
/// A request to force a consolidation of `corrections` once both `desired_frontiers` and
805808
/// `persist_frontiers` become greater than the given frontier.
@@ -894,15 +897,6 @@ mod write {
894897
self.corrections.ok.advance_since(frontier.clone());
895898
self.corrections.err.advance_since(frontier.clone());
896899

897-
// If the `persist` frontier is greater than the `lower` of the current batch
898-
// description, we won't be able to append the batch, so the batch description is not
899-
// valid anymore.
900-
if let Some((desc, _)) = &self.batch_description {
901-
if PartialOrder::less_than(&desc.lower, frontier) {
902-
self.batch_description = None;
903-
}
904-
}
905-
906900
self.maybe_force_consolidation();
907901
}
908902

@@ -927,16 +921,18 @@ mod write {
927921
}
928922

929923
fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
930-
// The incoming batch description is outdated if either:
931-
// * we already have a batch description with a greater `lower`, or
932-
// * its `lower` is less than the persist frontier
933-
let validity_frontier = match &self.batch_description {
934-
Some((prev, _)) => &prev.lower,
935-
None => self.persist_frontiers.frontier(),
936-
};
937-
if PartialOrder::less_than(&desc.lower, validity_frontier) {
938-
self.trace(format!("skipping outdated batch description: {desc:?}"));
939-
return;
924+
// The incoming batch description is outdated if we already have a batch description
925+
// with a greater `lower`.
926+
//
927+
// Note that we cannot assume a description is outdated based on the comparison of its
928+
// `lower` with the `persist_frontier`. The persist frontier observed by the `write`
929+
// operator is initialized with the shard's read frontier, so it can be greater than
930+
// the shard's write frontier.
931+
if let Some((prev, _)) = &self.batch_description {
932+
if PartialOrder::less_than(&desc.lower, &prev.lower) {
933+
self.trace(format!("skipping outdated batch description: {desc:?}"));
934+
return;
935+
}
940936
}
941937

942938
self.batch_description = Some((desc, cap));
@@ -948,7 +944,8 @@ mod write {
948944

949945
// We can write a new batch if we have seen all `persist` updates before `lower` and
950946
// all `desired` updates up to `upper`.
951-
let persist_complete = desc.lower == *self.persist_frontiers.frontier();
947+
let persist_complete =
948+
PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
952949
let desired_complete =
953950
PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
954951
if !persist_complete || !desired_complete {
@@ -957,9 +954,6 @@ mod write {
957954

958955
let (desc, cap) = self.batch_description.take()?;
959956

960-
assert_eq!(desc.lower, *self.corrections.ok.since());
961-
assert_eq!(desc.lower, *self.corrections.err.since());
962-
963957
let ok_updates = self.corrections.ok.updates_before(&desc.upper);
964958
let err_updates = self.corrections.err.updates_before(&desc.upper);
965959

test/sqllogictest/github-8867.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Regression test for database-issues#8867.
11+
12+
mode cockroach
13+
14+
simple conn=mz_system,user=mz_system
15+
ALTER SYSTEM SET enable_refresh_every_mvs = true
16+
----
17+
COMPLETE 0
18+
19+
statement ok
20+
CREATE TABLE t (x int)
21+
22+
statement ok
23+
INSERT INTO t VALUES (1)
24+
25+
statement ok
26+
CREATE MATERIALIZED VIEW mv1
27+
WITH (REFRESH EVERY '10s' ALIGNED TO mz_now()::text::int8 + 10000)
28+
AS SELECT * FROM t
29+
30+
statement ok
31+
CREATE MATERIALIZED VIEW mv2
32+
WITH (REFRESH EVERY '10 s' ALIGNED TO mz_now()::text::int8 + 10000)
33+
AS SELECT * FROM mv1
34+
35+
query I
36+
SELECT * FROM mv1;
37+
----
38+
1
39+
40+
query I
41+
SELECT * FROM mv2;
42+
----
43+
1

0 commit comments

Comments
 (0)