Skip to content

Commit 2203b11

Browse files
authored
chore(query): refine the log and error message (#17993)
* chore(query): refine the log and error message * fix: 00_0004_copy_with_max_files.result * fix:tests/it/pipelines/executor/pipeline_executor.rs * refine the query_ctx.rs error message
1 parent dc25c75 commit 2203b11

File tree

24 files changed

+268
-175
lines changed

24 files changed

+268
-175
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ pub async fn do_vacuum2(
9898

9999
let snapshots_before_lvt = match retention_policy {
100100
RetentionPolicy::ByTimePeriod(delta_duration) => {
101-
info!("using by ByTimePeriod policy {:?}", delta_duration);
101+
info!(
102+
"[FUSE-VACUUM2] Using ByTimePeriod policy {:?}",
103+
delta_duration
104+
);
102105
let retention_period = if fuse_table.is_transient() {
103106
// For transient table, keep no history data
104107
TimeDelta::zero()
@@ -118,7 +121,7 @@ pub async fn do_vacuum2(
118121
}
119122

120123
ctx.set_status_info(&format!(
121-
"set lvt for table {} takes {:?}, lvt: {:?}",
124+
"[FUSE-VACUUM2] Set LVT for table {}, elapsed: {:?}, LVT: {:?}",
122125
fuse_table.get_table_info().desc,
123126
start.elapsed(),
124127
lvt
@@ -130,7 +133,7 @@ pub async fn do_vacuum2(
130133
}
131134
RetentionPolicy::ByNumOfSnapshotsToKeep(num_snapshots_to_keep) => {
132135
info!(
133-
"using by ByNumOfSnapshotsToKeep policy {:?}",
136+
"[FUSE-VACUUM2] Using ByNumOfSnapshotsToKeep policy {:?}",
134137
num_snapshots_to_keep
135138
);
136139
// List the snapshot order by timestamp asc, till the current snapshot(inclusively).
@@ -171,7 +174,7 @@ pub async fn do_vacuum2(
171174

172175
let elapsed = start.elapsed();
173176
ctx.set_status_info(&format!(
174-
"list snapshots for table {} takes {:?}, snapshots_dir: {:?}, snapshots: {:?}",
177+
"[FUSE-VACUUM2] Listed snapshots for table {}, elapsed: {:?}, snapshots_dir: {:?}, snapshots: {:?}",
175178
fuse_table.get_table_info().desc,
176179
elapsed,
177180
fuse_table
@@ -193,7 +196,7 @@ pub async fn do_vacuum2(
193196
return Ok(vec![]);
194197
};
195198
ctx.set_status_info(&format!(
196-
"select gc_root for table {} takes {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
199+
"[FUSE-VACUUM2] Selected gc_root for table {}, elapsed: {:?}, gc_root: {:?}, snapshots_to_gc: {:?}",
197200
fuse_table.get_table_info().desc,
198201
start.elapsed(),
199202
gc_root,
@@ -222,7 +225,7 @@ pub async fn do_vacuum2(
222225
.collect::<Vec<_>>();
223226

224227
ctx.set_status_info(&format!(
225-
"list segments before gc_root for table {} takes {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}",
228+
"[FUSE-VACUUM2] Listed segments before gc_root for table {}, elapsed: {:?}, segment_dir: {:?}, gc_root_timestamp: {:?}, segments: {:?}",
226229
fuse_table.get_table_info().desc,
227230
start.elapsed(),
228231
fuse_table.meta_location_generator().segment_location_prefix(),
@@ -236,7 +239,7 @@ pub async fn do_vacuum2(
236239
.filter(|s| !gc_root_segments.contains(s))
237240
.collect();
238241
ctx.set_status_info(&format!(
239-
"Filter segments to gc for table {} takes {:?}, segments_to_gc: {:?}",
242+
"[FUSE-VACUUM2] Filtered segments_to_gc for table {}, elapsed: {:?}, segments_to_gc: {:?}",
240243
fuse_table.get_table_info().desc,
241244
start.elapsed(),
242245
slice_summary(&segments_to_gc)
@@ -253,7 +256,7 @@ pub async fn do_vacuum2(
253256
gc_root_blocks.extend(segment?.block_metas()?.iter().map(|b| b.location.0.clone()));
254257
}
255258
ctx.set_status_info(&format!(
256-
"read segments for table {} takes {:?}",
259+
"[FUSE-VACUUM2] Read segments for table {}, elapsed: {:?}",
257260
fuse_table.get_table_info().desc,
258261
start.elapsed(),
259262
));
@@ -272,7 +275,7 @@ pub async fn do_vacuum2(
272275
.collect::<Vec<_>>();
273276

274277
ctx.set_status_info(&format!(
275-
"list blocks before gc_root for table {} takes {:?}, block_dir: {:?}, least_visible_timestamp: {:?}, blocks: {:?}",
278+
"[FUSE-VACUUM2] Listed blocks before gc_root for table {}, elapsed: {:?}, block_dir: {:?}, gc_root_timestamp: {:?}, blocks: {:?}",
276279
fuse_table.get_table_info().desc,
277280
start.elapsed(),
278281
fuse_table.meta_location_generator().block_location_prefix(),
@@ -286,7 +289,7 @@ pub async fn do_vacuum2(
286289
.filter(|b| !gc_root_blocks.contains(b))
287290
.collect();
288291
ctx.set_status_info(&format!(
289-
"Filter blocks to gc for table {} takes {:?}, blocks_to_gc: {:?}",
292+
"[FUSE-VACUUM2] Filtered blocks_to_gc for table {}, elapsed: {:?}, blocks_to_gc: {:?}",
290293
fuse_table.get_table_info().desc,
291294
start.elapsed(),
292295
slice_summary(&blocks_to_gc)
@@ -326,7 +329,7 @@ pub async fn do_vacuum2(
326329
}
327330

328331
ctx.set_status_info(&format!(
329-
"collect indexes to gc for table {} takes {:?}, indexes_to_gc: {:?}",
332+
"[FUSE-VACUUM2] Collected indexes_to_gc for table {}, elapsed: {:?}, indexes_to_gc: {:?}",
330333
fuse_table.get_table_info().desc,
331334
start.elapsed(),
332335
slice_summary(&indexes_to_gc)
@@ -369,7 +372,7 @@ pub async fn do_vacuum2(
369372
.chain(indexes_to_gc.into_iter())
370373
.collect();
371374
ctx.set_status_info(&format!(
372-
"remove files for table {} takes {:?}, files_to_gc: {:?}",
375+
"[FUSE-VACUUM2] Removed files for table {}, elapsed: {:?}, files_to_gc: {:?}",
373376
fuse_table.get_table_info().desc,
374377
start.elapsed(),
375378
slice_summary(&files_to_gc)
@@ -419,15 +422,15 @@ async fn set_lvt(
419422
) -> Result<Option<DateTime<Utc>>> {
420423
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
421424
info!(
422-
"Table {} has no snapshot, stop vacuuming",
425+
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
423426
fuse_table.get_table_info().desc
424427
);
425428
return Ok(None);
426429
};
427430
if !is_uuid_v7(&latest_snapshot.snapshot_id) {
428431
info!(
429-
"latest snapshot {:?} is not v7, stop vacuuming",
430-
latest_snapshot
432+
"[FUSE-VACUUM2] Latest snapshot is not v7, stopping vacuum: {:?}",
433+
latest_snapshot.snapshot_id
431434
);
432435
return Ok(None);
433436
}
@@ -458,7 +461,7 @@ async fn list_until_prefix(
458461
need_one_more: bool,
459462
gc_root_meta_ts: Option<DateTime<Utc>>,
460463
) -> Result<Vec<Entry>> {
461-
info!("list until prefix: {}", until);
464+
info!("[FUSE-VACUUM2] Listing files until prefix: {}", until);
462465
let dal = fuse_table.get_operator_ref();
463466

464467
match dal.info().scheme() {

src/query/service/src/interpreters/interpreter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ pub trait Interpreter: Sync + Send {
101101
async fn execute_inner(&self, ctx: Arc<QueryContext>) -> Result<SendableDataBlockStream> {
102102
let make_error = || "failed to execute interpreter";
103103

104-
ctx.set_status_info("building pipeline");
104+
ctx.set_status_info("[INTERPRETER] Building execution pipeline");
105105
ctx.check_aborting().with_context(make_error)?;
106106

107107
let enable_disk_cache = match LicenseManagerSwitch::instance()
@@ -110,7 +110,7 @@ pub trait Interpreter: Sync + Send {
110110
Ok(_) => true,
111111
Err(e) => {
112112
log::error!(
113-
"[Interpreter] CRITICAL ALERT: License validation FAILED - enterprise features DISABLED, System may operate in DEGRADED MODE with LIMITED CAPABILITIES and REDUCED PERFORMANCE. Please contact us at https://www.databend.com/contact-us/ or email [email protected] to restore full functionality: {}",
113+
"[INTERPRETER] CRITICAL ALERT: License validation FAILED - enterprise features DISABLED, System may operate in DEGRADED MODE with LIMITED CAPABILITIES and REDUCED PERFORMANCE. Please contact us at https://www.databend.com/contact-us/ or email [email protected] to restore full functionality: {}",
114114
e
115115
);
116116
false
@@ -138,7 +138,7 @@ pub trait Interpreter: Sync + Send {
138138
on_execution_finished(info, query_ctx)
139139
}));
140140

141-
ctx.set_status_info("executing pipeline");
141+
ctx.set_status_info("[INTERPRETER] Executing pipeline");
142142

143143
let settings = ctx.get_settings();
144144
build_res.set_max_threads(settings.get_max_threads()? as usize);
@@ -191,7 +191,7 @@ fn log_query_start(ctx: &QueryContext) {
191191
}
192192

193193
if let Err(error) = InterpreterQueryLog::log_start(ctx, now, None) {
194-
error!("[Interpreter] Query start logging failed: {:?}", error)
194+
error!("[INTERPRETER] Failed to log query start: {:?}", error)
195195
}
196196
}
197197

@@ -218,7 +218,7 @@ fn log_query_finished(ctx: &QueryContext, error: Option<ErrorCode>, has_profiles
218218
}
219219

220220
if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) {
221-
error!("[Interpreter] Query finish logging failed: {:?}", error)
221+
error!("[INTERPRETER] Failed to log query finish: {:?}", error)
222222
}
223223
}
224224

@@ -261,7 +261,7 @@ async fn auto_commit_if_not_allowed_in_transaction(
261261
}
262262
if !stmt.is_transaction_command() && ctx.txn_mgr().lock().is_fail() {
263263
let err = ErrorCode::CurrentTransactionIsAborted(
264-
"Current transaction is aborted, commands ignored until end of transaction block",
264+
"[INTERPRETER] Current transaction is aborted, commands ignored until end of transaction block",
265265
);
266266
return Err(err);
267267
}

src/query/service/src/interpreters/interpreter_select.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ impl SelectInterpreter {
112112
#[async_backtrace::framed]
113113
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
114114
let mut builder = PhysicalPlanBuilder::new(self.metadata.clone(), self.ctx.clone(), false);
115-
self.ctx.set_status_info("building physical plan");
115+
self.ctx
116+
.set_status_info("[SELECT-INTERP] Building physical plan");
116117
builder
117118
.build(&self.s_expr, self.bind_context.column_set())
118119
.await
@@ -151,7 +152,7 @@ impl SelectInterpreter {
151152
update_table_metas: streams.update_table_metas,
152153
..Default::default()
153154
};
154-
info!("Updating the stream meta to consume data");
155+
info!("[SELECT-INTERP] Updating stream metadata to consume data");
155156
catalog.update_multi_table_meta(r).await.map(|_| ())
156157
}
157158
None => Ok(()),
@@ -239,7 +240,7 @@ impl SelectInterpreter {
239240
if t.name().eq_ignore_ascii_case("result_scan") {
240241
return if tables.len() > 1 {
241242
Err(ErrorCode::Unimplemented(
242-
"The current `RESULT_SCAN` only supports single table queries",
243+
"[SELECT-INTERP] RESULT_SCAN currently supports only single table queries",
243244
))
244245
} else {
245246
Ok(Some(t.table()))
@@ -279,15 +280,16 @@ impl Interpreter for SelectInterpreter {
279280
async fn execute2(&self) -> Result<PipelineBuildResult> {
280281
self.attach_tables_to_ctx();
281282

282-
self.ctx.set_status_info("preparing plan");
283+
self.ctx
284+
.set_status_info("[SELECT-INTERP] Preparing execution plan");
283285

284286
// 0. Need to build physical plan first to get the partitions.
285287
let physical_plan = self.build_physical_plan().await?;
286288
let query_plan = physical_plan
287289
.format(self.metadata.clone(), Default::default())?
288290
.format_pretty()?;
289291

290-
info!("Query physical plan: \n{}", query_plan);
292+
info!("[SELECT-INTERP] Query physical plan:\n{}", query_plan);
291293

292294
if self.ctx.get_settings().get_enable_query_result_cache()?
293295
&& self.ctx.get_cacheable()
@@ -344,7 +346,7 @@ impl Interpreter for SelectInterpreter {
344346
}
345347
Err(e) => {
346348
// 2.3 If an error occurs, turn back to the normal pipeline.
347-
error!("Failed to read query result cache. {}", e);
349+
error!("[SELECT-INTERP] Failed to read query result cache: {}", e);
348350
}
349351
}
350352
}

src/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl PipelineExecutor {
159159
}
160160

161161
info!(
162-
"Init pipeline successfully, query_id: {:?}, elapsed: {:?}",
162+
"[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}",
163163
query_id,
164164
instant.elapsed()
165165
);
@@ -171,7 +171,7 @@ impl PipelineExecutor {
171171
let instants = Instant::now();
172172
let _guard = defer(move || {
173173
info!(
174-
"Pipeline executor finished, elapsed: {:?}",
174+
"[PIPELINE-EXECUTOR] Execution completed, elapsed: {:?}",
175175
instants.elapsed()
176176
);
177177
});
@@ -226,8 +226,8 @@ impl PipelineExecutor {
226226
if let Either::Left(_) = select(max_execute_future, finished_future).await {
227227
if let Some(graph) = this_graph.upgrade() {
228228
graph.should_finish(Err(ErrorCode::AbortedQuery(
229-
"Aborted query, because the execution time exceeds the maximum execution time limit",
230-
))).expect("exceed max execute time, but cannot send error message");
229+
"[PIPELINE-EXECUTOR] Query aborted due to execution time exceeding maximum limit",
230+
))).expect("[PIPELINE-EXECUTOR] Failed to send timeout error message");
231231
}
232232
}
233233
});
@@ -244,13 +244,13 @@ impl PipelineExecutor {
244244
query_wrapper
245245
.graph
246246
.should_finish(Err(may_error))
247-
.expect("executor cannot send error message");
247+
.expect("[PIPELINE-EXECUTOR] Failed to send error message");
248248
}
249249
None => {
250250
query_wrapper
251251
.graph
252252
.should_finish::<()>(Ok(()))
253-
.expect("executor cannot send error message");
253+
.expect("[PIPELINE-EXECUTOR] Failed to send completion message");
254254
}
255255
},
256256
}
@@ -289,7 +289,7 @@ impl PipelineExecutor {
289289
pub fn change_priority(&self, priority: u8) {
290290
match self {
291291
PipelineExecutor::QueryPipelineExecutor(_) => {
292-
unreachable!("Logic error, cannot change priority for QueryPipelineExecutor")
292+
unreachable!("[PIPELINE-EXECUTOR] Logic error: cannot change priority for QueryPipelineExecutor")
293293
}
294294
PipelineExecutor::QueriesPipelineExecutor(query_wrapper) => {
295295
query_wrapper.graph.change_priority(priority as u64);

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl ProcessorAsyncTask {
116116
match futures::future::select(left, right).await {
117117
Either::Left((res, _)) => res,
118118
Either::Right((_, _)) => Err(ErrorCode::AbortedQuery(
119-
"Aborted query, because the server is shutting down or the query was killed.",
119+
"[PROCESSOR-ASYNC-TASK] Query aborted due to server shutdown or query termination",
120120
)),
121121
}
122122
};
@@ -136,7 +136,7 @@ impl ProcessorAsyncTask {
136136
let elapsed = start.elapsed();
137137
let active_workers = queue_clone.active_workers();
138138
warn!(
139-
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}, active sync workers: {:?}",
139+
"[PROCESSOR-ASYNC-TASK] Slow async task detected - query: {:?}, processor: {:?} ({}), elapsed: {:?}, active workers: {:?}",
140140
query_id, processor_id, processor_name, elapsed, active_workers
141141
);
142142
}

src/query/service/src/pipelines/executor/query_pipeline_executor.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl QueryPipelineExecutor {
7676

7777
if threads_num.is_zero() {
7878
return Err(ErrorCode::Internal(
79-
"Pipeline max threads cannot equals zero.",
79+
"[PIPELINE-EXECUTOR] Pipeline max threads cannot be zero",
8080
));
8181
}
8282

@@ -108,7 +108,9 @@ impl QueryPipelineExecutor {
108108
settings: ExecutorSettings,
109109
) -> Result<Arc<QueryPipelineExecutor>> {
110110
if pipelines.is_empty() {
111-
return Err(ErrorCode::Internal("Executor Pipelines is empty."));
111+
return Err(ErrorCode::Internal(
112+
"[PIPELINE-EXECUTOR] Executor pipelines cannot be empty",
113+
));
112114
}
113115

114116
let threads_num = pipelines
@@ -119,7 +121,7 @@ impl QueryPipelineExecutor {
119121

120122
if threads_num.is_zero() {
121123
return Err(ErrorCode::Internal(
122-
"Pipeline max threads cannot equals zero.",
124+
"[PIPELINE-EXECUTOR] Pipeline max threads cannot be zero",
123125
));
124126
}
125127

@@ -200,7 +202,8 @@ impl QueryPipelineExecutor {
200202
}
201203

202204
pub fn finish<C>(&self, cause: Option<ErrorCode<C>>) {
203-
let cause = cause.map(|err| err.with_context("pipeline executor finished"));
205+
let cause =
206+
cause.map(|err| err.with_context("[PIPELINE-EXECUTOR] Pipeline executor finished"));
204207

205208
let mut finished_error = self.finished_error.lock();
206209
if let Some(cause) = cause {
@@ -284,7 +287,7 @@ impl QueryPipelineExecutor {
284287
}
285288

286289
info!(
287-
"Init pipeline successfully, query_id: {:?}, elapsed: {:?}",
290+
"[PIPELINE-EXECUTOR] Pipeline initialized successfully for query {}, elapsed: {:?}",
288291
self.settings.query_id,
289292
instant.elapsed()
290293
);
@@ -327,7 +330,7 @@ impl QueryPipelineExecutor {
327330
if let Either::Left(_) = select(max_execute_future, finished_future).await {
328331
if let Some(executor) = this.upgrade() {
329332
executor.finish(Some(ErrorCode::AbortedQuery(
330-
"Aborted query, because the execution time exceeds the maximum execution time limit",
333+
"[PIPELINE-EXECUTOR] Query aborted due to execution time exceeding maximum limit",
331334
)));
332335
}
333336
}
@@ -452,7 +455,9 @@ impl Drop for QueryPipelineExecutor {
452455

453456
let cause = match self.finished_error.lock().as_ref() {
454457
Some(cause) => cause.clone(),
455-
None => ErrorCode::Internal("Pipeline illegal state: not successfully shutdown."),
458+
None => ErrorCode::Internal(
459+
"[PIPELINE-EXECUTOR] Pipeline illegal state: not successfully shutdown",
460+
),
456461
};
457462

458463
let mut on_finished_chain = self.on_finished_chain.lock();
@@ -463,7 +468,7 @@ impl Drop for QueryPipelineExecutor {
463468
let profiling = self.fetch_plans_profile(true);
464469
let info = ExecutionInfo::create(Err(cause), profiling);
465470
if let Err(cause) = on_finished_chain.apply(info) {
466-
warn!("Pipeline executor shutdown failure, {:?}", cause);
471+
warn!("[PIPELINE-EXECUTOR] Shutdown failure: {:?}", cause);
467472
}
468473
}
469474
}

0 commit comments

Comments
 (0)