Skip to content

Commit 2a231b8

Browse files
fix: avoid committing new snapshot if table not changed (#19174)
* fix: avoid commiting new snapshot if table not changed * add logic test * refine code comments * refine code comment * tweak logic test * fix typo * fix logic test --------- Co-authored-by: Yang Xiufeng <yangxiufeng.c@gmail.com>
1 parent b769507 commit 2a231b8

File tree

3 files changed

+197
-5
lines changed

3 files changed

+197
-5
lines changed

src/query/storages/fuse/src/operations/common/generators/append_generator.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ impl AppendGenerator {
5757
}
5858
}
5959

60+
pub fn is_overwrite(&self) -> bool {
61+
self.overwrite
62+
}
63+
6064
fn check_fill_default(&self, summary: &Statistics) -> Result<bool> {
6165
let mut fill_default_values = false;
6266
// check if need to fill default value in statistics

src/query/storages/fuse/src/operations/common/processors/sink_commit.rs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use crate::FuseTable;
6262
use crate::io::TableMetaLocationGenerator;
6363
use crate::operations::AppendGenerator;
6464
use crate::operations::CommitMeta;
65+
use crate::operations::ConflictResolveContext;
6566
use crate::operations::MutationGenerator;
6667
use crate::operations::SnapshotGenerator;
6768
use crate::operations::TransformMergeCommitMeta;
@@ -121,6 +122,10 @@ pub struct CommitSink<F: SnapshotGenerator> {
121122
deduplicated_label: Option<String>,
122123
table_meta_timestamps: TableMetaTimestamps,
123124
vacuum_handler: Option<Arc<VacuumHandlerWrapper>>,
125+
// Tracks whether the ongoing mutation produced no physical changes.
126+
// We still need to read the previous snapshot before deciding to skip the commit,
127+
// because new tables must record their first snapshot even for empty writes.
128+
pending_noop_commit: bool,
124129
}
125130

126131
#[derive(Debug)]
@@ -183,6 +188,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
183188
deduplicated_label,
184189
table_meta_timestamps,
185190
vacuum_handler,
191+
pending_noop_commit: false,
186192
})))
187193
}
188194

@@ -287,21 +293,45 @@ where F: SnapshotGenerator + Send + Sync + 'static
287293
let meta = CommitMeta::downcast_from(input_meta)
288294
.ok_or_else(|| ErrorCode::Internal("No commit meta. It's a bug"))?;
289295

290-
self.new_segment_locs = meta.new_segment_locs;
296+
let CommitMeta {
297+
conflict_resolve_context,
298+
new_segment_locs,
299+
table_id: _,
300+
virtual_schema,
301+
hll,
302+
} = meta;
291303

292-
self.new_virtual_schema = meta.virtual_schema;
304+
let has_new_segments = !new_segment_locs.is_empty();
305+
let has_virtual_schema = virtual_schema.is_some();
306+
let has_hll = !hll.is_empty();
293307

294-
if !meta.hll.is_empty() {
308+
self.new_segment_locs = new_segment_locs;
309+
310+
self.new_virtual_schema = virtual_schema;
311+
312+
if has_hll {
295313
let binding = self.ctx.get_mutation_status();
296314
let status = binding.read();
297315
self.insert_rows = status.insert_rows + status.update_rows;
298-
self.insert_hll = meta.hll;
316+
self.insert_hll = hll;
299317
}
300318

301319
self.backoff = set_backoff(None, None, self.max_retry_elapsed);
302320

321+
// Decide whether this mutation ended up as a no-op. We postpone the actual
322+
// "skip commit" decision until `State::FillDefault`, after we know whether
323+
// the table already has a snapshot.
324+
self.pending_noop_commit = Self::should_skip_commit(
325+
&conflict_resolve_context,
326+
has_new_segments,
327+
has_virtual_schema,
328+
has_hll,
329+
self.allow_append_only_skip(),
330+
);
331+
303332
self.snapshot_gen
304-
.set_conflict_resolve_context(meta.conflict_resolve_context);
333+
.set_conflict_resolve_context(conflict_resolve_context);
334+
305335
self.state = State::FillDefault;
306336

307337
Ok(Event::Async)
@@ -318,6 +348,30 @@ where F: SnapshotGenerator + Send + Sync + 'static
318348
.is_some_and(|generator| matches!(generator.mode(), TruncateMode::DropAll))
319349
}
320350

351+
fn should_skip_commit(
352+
ctx: &ConflictResolveContext,
353+
has_new_segments: bool,
354+
has_virtual_schema: bool,
355+
has_new_hll: bool,
356+
allow_append_only_skip: bool,
357+
) -> bool {
358+
if has_new_segments || has_virtual_schema || has_new_hll {
359+
return false;
360+
}
361+
362+
match ctx {
363+
ConflictResolveContext::ModifiedSegmentExistsInLatest(changes) => {
364+
changes.appended_segments.is_empty()
365+
&& changes.replaced_segments.is_empty()
366+
&& changes.removed_segment_indexes.is_empty()
367+
}
368+
ConflictResolveContext::AppendOnly((merged, _)) => {
369+
allow_append_only_skip && merged.merged_segments.is_empty()
370+
}
371+
_ => false,
372+
}
373+
}
374+
321375
fn need_truncate(&self) -> bool {
322376
self.snapshot_gen
323377
.as_any()
@@ -332,6 +386,17 @@ where F: SnapshotGenerator + Send + Sync + 'static
332386
.is_some()
333387
}
334388

389+
/// Append-only inserts (e.g. `INSERT INTO t SELECT ...`) may skip committing if
390+
/// nothing was written. Overwrite/CTAS (`CREATE OR REPLACE TABLE t AS SELECT ...`
391+
/// or `INSERT OVERWRITE ...`) still need a snapshot even when nothing was written,
392+
/// so we disable skipping when `AppendGenerator` is in overwrite mode.
393+
fn allow_append_only_skip(&self) -> bool {
394+
self.snapshot_gen
395+
.as_any()
396+
.downcast_ref::<AppendGenerator>()
397+
.is_some_and(|g| !g.is_overwrite())
398+
}
399+
335400
async fn clean_history(&self, purge_mode: &PurgeMode) -> Result<()> {
336401
{
337402
let table_info = self.table.get_table_info();
@@ -496,6 +561,24 @@ where F: SnapshotGenerator + Send + Sync + 'static
496561
// if table_id not match, update table meta will fail
497562
let mut table_info = fuse_table.table_info.clone();
498563

564+
let require_initial_snapshot = self.table.is_temp();
565+
// Only skip when both conditions hold:
566+
// 1) the mutation touched nothing (`pending_noop_commit` is true).
567+
// 2) the table already has a snapshot, or it's safe to skip the initial snapshot.
568+
// CTAS-created temporary tables must still commit even when the SELECT returns zero rows,
569+
// because `system.temporary_tables` currently depends on the committed table meta to show
570+
// correct statistics.
571+
let skip_commit =
572+
self.pending_noop_commit && (previous.is_some() || !require_initial_snapshot);
573+
// Reset the flag so subsequent mutations (or retries) re-evaluate their own no-op status.
574+
self.pending_noop_commit = false;
575+
if skip_commit {
576+
self.ctx
577+
.set_status_info("No table changes detected, skip commit");
578+
self.state = State::Finish;
579+
return Ok(());
580+
}
581+
499582
// merge virtual schema
500583
let old_virtual_schema = std::mem::take(&mut table_info.meta.virtual_schema);
501584
let new_virtual_schema = std::mem::take(&mut self.new_virtual_schema);
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# https://github.com/datafuselabs/databend/issues/19173
2+
3+
statement ok
4+
create or replace database issue_19173;
5+
6+
statement ok
7+
use issue_19173;
8+
9+
statement ok
10+
create or replace table target (c int);
11+
12+
statement ok
13+
create or replace table empty like target;
14+
15+
# mutations in autocommit mode
16+
17+
statement ok
18+
merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *;
19+
20+
query I
21+
select count() from fuse_snapshot('issue_19173', 'target');
22+
----
23+
0
24+
25+
statement ok
26+
delete from target where c = 1;
27+
28+
query I
29+
select count() from fuse_snapshot('issue_19173', 'target');
30+
----
31+
0
32+
33+
statement ok
34+
update target set c =2 where c = 1;
35+
36+
query I
37+
select count() from fuse_snapshot('issue_19173', 'target');
38+
----
39+
0
40+
41+
statement ok
42+
insert into target select * from empty;
43+
44+
query I
45+
select count() from fuse_snapshot('issue_19173', 'target');
46+
----
47+
0
48+
49+
# mutations inside explicit transactions
50+
51+
statement ok
52+
begin transaction;
53+
54+
statement ok
55+
merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *;
56+
57+
statement ok
58+
commit;
59+
60+
query I
61+
select count() from fuse_snapshot('issue_19173', 'target');
62+
----
63+
0
64+
65+
statement ok
66+
begin transaction;
67+
68+
statement ok
69+
delete from target where c = 1;
70+
71+
statement ok
72+
commit;
73+
74+
query I
75+
select count() from fuse_snapshot('issue_19173', 'target');
76+
----
77+
0
78+
79+
statement ok
80+
begin transaction;
81+
82+
statement ok
83+
update target set c =2 where c = 1;
84+
85+
statement ok
86+
commit;
87+
88+
query I
89+
select count() from fuse_snapshot('issue_19173', 'target');
90+
----
91+
0
92+
93+
statement ok
94+
begin transaction;
95+
96+
statement ok
97+
insert into target select * from empty;
98+
99+
statement ok
100+
commit;
101+
102+
query I
103+
select count() from fuse_snapshot('issue_19173', 'target');
104+
----
105+
0

0 commit comments

Comments
 (0)