Skip to content

Commit c756b96

Browse files
dantengskyBohuTANG
andauthored
fix: fuse_vacuum2 panic while vauuming empty table with data_retentio… (#18744)
* fix: fuse_vacuum2 panic while vauuming empty table with data_retention_num_snapshots_to_keep policy Return early if found table has no snapshot * revert test config file * improve(vacuum): enhance vacuum drop table logging for better progress tracking - Replace verbose TableMeta output with concise table_name(id:table_id) format - Add clear start/completion markers with === delimiters - Improve result summary with specific counts of success/failed operations - Add detailed progress information while preserving all debug data - Failed table IDs are still logged separately for troubleshooting * tweak logs --------- Co-authored-by: BohuTANG <[email protected]>
1 parent b58f4f7 commit c756b96

File tree

5 files changed

+109
-26
lines changed

5 files changed

+109
-26
lines changed

src/meta/api/src/garbage_collection_api.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ use display_more::DisplaySliceExt;
4848
use fastrace::func_name;
4949
use futures::StreamExt;
5050
use futures::TryStreamExt;
51+
use log::debug;
5152
use log::error;
5253
use log::info;
54+
use log::warn;
5355

5456
use crate::index_api::IndexApi;
5557
use crate::kv_app_error::KVAppError;
@@ -178,6 +180,11 @@ pub async fn get_history_tables_for_gc(
178180
db_id: u64,
179181
limit: usize,
180182
) -> Result<Vec<TableNIV>, KVAppError> {
183+
info!(
184+
"get_history_tables_for_gc: db_id {}, limit {}",
185+
db_id, limit
186+
);
187+
181188
let ident = TableIdHistoryIdent {
182189
database_id: db_id,
183190
table_name: "dummy".to_string(),
@@ -196,6 +203,15 @@ pub async fn get_history_tables_for_gc(
196203
let mut filter_tb_infos = vec![];
197204
const BATCH_SIZE: usize = 1000;
198205

206+
let args_len = args.len();
207+
let mut num_out_of_time_range = 0;
208+
let mut num_processed = 0;
209+
210+
info!(
211+
"get_history_tables_for_gc: {} items to process in db {}",
212+
args_len, db_id
213+
);
214+
199215
// Process in batches to avoid performance issues
200216
for chunk in args.chunks(BATCH_SIZE) {
201217
// Get table metadata for current batch
@@ -205,15 +221,16 @@ pub async fn get_history_tables_for_gc(
205221
// Filter by drop_time_range for current batch
206222
for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) {
207223
let Some(seq_meta) = seq_meta else {
208-
error!(
224+
warn!(
209225
"batch_filter_table_info cannot find {:?} table_meta",
210226
table_id
211227
);
212228
continue;
213229
};
214230

215231
if !drop_time_range.contains(&seq_meta.data.drop_on) {
216-
info!("table {:?} is not in drop_time_range", seq_meta.data);
232+
debug!("table {:?} is not in drop_time_range", seq_meta.data);
233+
num_out_of_time_range += 1;
217234
continue;
218235
}
219236

@@ -225,9 +242,20 @@ pub async fn get_history_tables_for_gc(
225242

226243
// Check if we have reached the limit
227244
if filter_tb_infos.len() >= limit {
245+
info!(
246+
"get_history_tables_for_gc: reach limit {}, so far collected {}",
247+
limit,
248+
filter_tb_infos.len()
249+
);
228250
return Ok(filter_tb_infos);
229251
}
230252
}
253+
254+
num_processed += chunk.len();
255+
info!(
256+
"get_history_tables_for_gc: process: {}/{}, {} items filtered by time range condition",
257+
num_processed, args_len, num_out_of_time_range
258+
);
231259
}
232260

233261
Ok(filter_tb_infos)

src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,13 @@ pub async fn vacuum_drop_tables_by_table_info(
189189
}
190190
};
191191

192+
let (_, failed_tables) = &result;
193+
let (success_count, failed_count) = (num_tables - failed_tables.len(), failed_tables.len());
192194
info!(
193-
"vacuum {} dropped tables, cost:{:?}",
195+
"vacuum {} dropped tables completed - success: {}, failed: {}, total_cost: {:?}",
194196
num_tables,
197+
success_count,
198+
failed_count,
195199
start.elapsed()
196200
);
197201

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ pub async fn do_vacuum2(
9898
}
9999

100100
let fuse_table = FuseTable::try_from_table(table)?;
101+
102+
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
103+
info!(
104+
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
105+
fuse_table.get_table_info().desc
106+
);
107+
return Ok(vec![]);
108+
};
109+
101110
let start = std::time::Instant::now();
102111

103112
let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
@@ -122,7 +131,9 @@ pub async fn do_vacuum2(
122131
// A zero retention period indicates that we should vacuum all the historical snapshots
123132
is_vacuum_all = retention_period.is_zero();
124133

125-
let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else {
134+
let Some(lvt) =
135+
set_lvt(fuse_table, latest_snapshot, ctx.as_ref(), retention_period).await?
136+
else {
126137
return Ok(vec![]);
127138
};
128139

@@ -153,6 +164,7 @@ pub async fn do_vacuum2(
153164
fuse_table
154165
.meta_location_generator()
155166
.snapshot_location_prefix(),
167+
// Safe to unwrap here: we have checked that `fuse_table` has a snapshot
156168
fuse_table.snapshot_loc().unwrap().as_str(),
157169
need_one_more,
158170
None,
@@ -433,16 +445,10 @@ async fn collect_gc_candidates_by_retention_period(
433445
/// Return `None` means we stop vacuumming, but don't want to report error to user.
434446
async fn set_lvt(
435447
fuse_table: &FuseTable,
448+
latest_snapshot: Arc<TableSnapshot>,
436449
ctx: &dyn TableContext,
437450
retention_period: TimeDelta,
438451
) -> Result<Option<DateTime<Utc>>> {
439-
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
440-
info!(
441-
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
442-
fuse_table.get_table_info().desc
443-
);
444-
return Ok(None);
445-
};
446452
if !is_uuid_v7(&latest_snapshot.snapshot_id) {
447453
info!(
448454
"[FUSE-VACUUM2] Latest snapshot is not v7, stopping vacuum: {:?}",

src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl VacuumDropTablesInterpreter {
5858
drop_ids: Vec<DroppedId>,
5959
) -> Result<()> {
6060
info!(
61-
"vacuum drop table from db {:?}, gc_drop_tables",
61+
"vacuum metadata of dropped table from db {:?}",
6262
self.plan.database,
6363
);
6464

@@ -75,6 +75,12 @@ impl VacuumDropTablesInterpreter {
7575
}
7676
}
7777

78+
info!(
79+
"found {} database meta data and {} table metadata need to be cleaned",
80+
drop_db_ids.len(),
81+
drop_db_table_ids.len()
82+
);
83+
7884
let chunk_size = 50;
7985

8086
// first gc drop table ids
@@ -124,8 +130,10 @@ impl Interpreter for VacuumDropTablesInterpreter {
124130
let retention_time = chrono::Utc::now() - duration;
125131
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
126132
info!(
127-
"vacuum drop table from db {:?}, duration: {:?}, retention_time: {:?}",
128-
self.plan.database, duration, retention_time
133+
"=== VACUUM DROP TABLE STARTED === db: {:?}, retention_days: {}, retention_time: {:?}",
134+
self.plan.database,
135+
ctx.get_settings().get_data_retention_time_in_days()?,
136+
retention_time
129137
);
130138
// if database if empty, vacuum all tables
131139
let database_name = if self.plan.database.is_empty() {
@@ -153,13 +161,18 @@ impl Interpreter for VacuumDropTablesInterpreter {
153161
}
154162

155163
info!(
156-
"vacuum drop table from db {:?}, get_drop_table_infos return tables: {:?},tables.len: {:?}, drop_ids: {:?}",
164+
"vacuum drop table from db {:?}, found {} tables: [{}], drop_ids: {:?}",
157165
self.plan.database,
166+
tables.len(),
158167
tables
159168
.iter()
160-
.map(|t| t.get_table_info())
161-
.collect::<Vec<_>>(),
162-
tables.len(),
169+
.map(|t| format!(
170+
"{}(id:{})",
171+
t.get_table_info().name,
172+
t.get_table_info().ident.table_id
173+
))
174+
.collect::<Vec<_>>()
175+
.join(", "),
163176
drop_ids
164177
);
165178

@@ -176,12 +189,17 @@ impl Interpreter for VacuumDropTablesInterpreter {
176189
}
177190

178191
info!(
179-
"after filter read-only tables: {:?}, tables.len: {:?}",
192+
"after filter read-only tables: {} tables remain: [{}]",
193+
tables.len(),
180194
tables
181195
.iter()
182-
.map(|t| t.get_table_info())
183-
.collect::<Vec<_>>(),
184-
tables.len()
196+
.map(|t| format!(
197+
"{}(id:{})",
198+
t.get_table_info().name,
199+
t.get_table_info().ident.table_id
200+
))
201+
.collect::<Vec<_>>()
202+
.join(", ")
185203
);
186204

187205
let tables_count = tables.len();
@@ -226,17 +244,30 @@ impl Interpreter for VacuumDropTablesInterpreter {
226244
}
227245
}
228246
info!(
229-
"failed dbs:{:?}, failed_tables:{:?}, success_drop_ids:{:?}",
230-
failed_db_ids, failed_tables, success_dropped_ids
247+
"vacuum drop table summary - failed dbs: {}, failed tables: {}, successfully cleaned: {} items",
248+
failed_db_ids.len(),
249+
failed_tables.len(),
250+
success_dropped_ids.len()
231251
);
252+
if !failed_tables.is_empty() {
253+
info!("failed table ids: {:?}", failed_tables);
254+
}
232255

233256
self.gc_drop_tables(catalog, success_dropped_ids).await?;
234257
}
235258

259+
let success_count = tables_count as u64 - failed_tables.len() as u64;
260+
let failed_count = failed_tables.len() as u64;
261+
262+
info!(
263+
"=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}",
264+
success_count, failed_count, tables_count
265+
);
266+
236267
match files_opt {
237268
None => PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
238-
UInt64Type::from_data(vec![tables_count as u64 - failed_tables.len() as u64]),
239-
UInt64Type::from_data(vec![failed_tables.len() as u64]),
269+
UInt64Type::from_data(vec![success_count]),
270+
UInt64Type::from_data(vec![failed_count]),
240271
])]),
241272
Some(purge_files) => {
242273
let mut len = min(purge_files.len(), DRY_RUN_LIMIT);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
statement ok
2+
create or replace database issue_18743;
3+
4+
statement ok
5+
use issue_18743;
6+
7+
statement ok
8+
CREATE OR REPLACE TABLE t(c int);
9+
10+
statement ok
11+
call system$fuse_vacuum2('issue_18743', 't');
12+
13+
statement ok
14+
call system$fuse_vacuum2();

0 commit comments

Comments
 (0)