Skip to content

Commit a332371

Browse files
authored
chore: improve the queue status (#18578)
1 parent 5d9d372 commit a332371

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ pub trait QueueData: Send + Sync + 'static {
8383

8484
fn enter_wait_pending(&self) {}
8585

86+
fn set_status(&self, _status: &str) {}
87+
8688
fn exit_wait_pending(&self, _wait_time: Duration) {}
8789

8890
fn get_abort_notify(&self) -> Arc<WatchNotify>;
@@ -224,19 +226,23 @@ impl<Data: QueueData> QueueManager<Data> {
224226
workload_group_timeout = std::cmp::min(queue_timeout, workload_group_timeout);
225227
}
226228

229+
data.set_status("[QUERY-QUEUE] Waiting for local workload semaphore");
230+
227231
let semaphore = workload_group.semaphore.clone();
228232
let acquire = tokio::time::timeout(timeout, semaphore.acquire_owned());
229233
let queue_future = AcquireQueueFuture::create(data.clone(), acquire, self.clone());
230234

231235
guards.push(queue_future.await?);
232236

233237
info!(
234-
"[QUERY-QUEUE] Successfully acquired from workload local group queue. elapsed: {:?}",
238+
"[QUERY-QUEUE] Successfully acquired from local workload semaphore. elapsed: {:?}",
235239
instant.elapsed()
236240
);
237241

238242
timeout -= instant.elapsed();
239243

244+
data.set_status("[QUERY-QUEUE] Waiting for global workload semaphore");
245+
240246
let workload_queue_guard = self
241247
.acquire_workload_queue(
242248
data.clone(),
@@ -247,14 +253,16 @@ impl<Data: QueueData> QueueManager<Data> {
247253
.await?;
248254

249255
info!(
250-
"[QUERY-QUEUE] Successfully acquired from workload meta group queue. elapsed: {:?}",
256+
"[QUERY-QUEUE] Successfully acquired from global workload semaphore. elapsed: {:?}",
251257
instant.elapsed()
252258
);
253259
timeout -= instant.elapsed();
254260
guards.push(workload_queue_guard);
255261
}
256262
}
257263

264+
data.set_status("[QUERY-QUEUE] Waiting for warehouse resource scheduling");
265+
258266
guards.extend(self.acquire_warehouse_queue(data, timeout).await?);
259267

260268
inc_session_running_acquired_queries();
@@ -692,6 +700,10 @@ impl QueueData for QueryEntry {
692700
.set_status_info("[QUERY-QUEUE] Waiting for resource scheduling");
693701
}
694702

703+
fn set_status(&self, status: &str) {
704+
self.ctx.set_status_info(status);
705+
}
706+
695707
fn exit_wait_pending(&self, wait_time: Duration) {
696708
self.ctx.set_status_info(
697709
format!(

0 commit comments

Comments
 (0)