Skip to content

Commit 5f3d165

Browse files
authored
chore(query): mark the 'create as select' query as heavy (#18365)
1 parent 810a557 commit 5f3d165

File tree

2 files changed

+54
-44
lines changed

2 files changed

+54
-44
lines changed

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -174,56 +174,59 @@ impl<Data: QueueData> QueueManager<Data> {
174174
}
175175

176176
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
177-
if data.need_acquire_to_queue() {
178-
info!(
179-
"[QUERY-QUEUE] Preparing to acquire from query queue, current length: {}",
180-
self.length()
181-
);
182-
183-
let start_time = SystemTime::now();
184-
let instant = Instant::now();
185-
let mut timeout = data.timeout();
186-
let mut guards = vec![];
187-
188-
let data = Arc::new(data);
189-
if let Some(workload_group) = ThreadTracker::workload_group() {
190-
if let Some(QuotaValue::Number(permits)) =
191-
workload_group.meta.get_quota(MAX_CONCURRENCY_QUOTA_KEY)
192-
{
193-
let mut workload_group_timeout = timeout;
194-
195-
if let Some(QuotaValue::Duration(queue_timeout)) = workload_group
196-
.meta
197-
.get_quota(QUERY_QUEUED_TIMEOUT_QUOTA_KEY)
198-
{
199-
workload_group_timeout =
200-
std::cmp::min(queue_timeout, workload_group_timeout);
201-
}
177+
if !data.need_acquire_to_queue() {
178+
info!("[QUERY-QUEUE] Non-heavy queries skip the query queue and execute directly.");
179+
return Ok(AcquireQueueGuard::create(vec![]));
180+
}
202181

203-
let workload_queue_guard = self
204-
.acquire_workload_queue(
205-
data.clone(),
206-
workload_group.queue_key.clone(),
207-
permits as u64,
208-
workload_group_timeout,
209-
)
210-
.await?;
211-
212-
info!("[QUERY-QUEUE] Successfully acquired from workload group queue. elapsed: {:?}", instant.elapsed());
213-
timeout -= instant.elapsed();
214-
guards.push(workload_queue_guard);
215-
}
216-
}
182+
info!(
183+
"[QUERY-QUEUE] Preparing to acquire from query queue, current length: {}",
184+
self.length()
185+
);
217186

218-
guards.push(self.acquire_warehouse_queue(data, timeout).await?);
187+
let start_time = SystemTime::now();
188+
let instant = Instant::now();
189+
let mut timeout = data.timeout();
190+
let mut guards = vec![];
191+
192+
let data = Arc::new(data);
193+
if let Some(workload_group) = ThreadTracker::workload_group() {
194+
if let Some(QuotaValue::Number(permits)) =
195+
workload_group.meta.get_quota(MAX_CONCURRENCY_QUOTA_KEY)
196+
{
197+
let mut workload_group_timeout = timeout;
198+
199+
if let Some(QuotaValue::Duration(queue_timeout)) = workload_group
200+
.meta
201+
.get_quota(QUERY_QUEUED_TIMEOUT_QUOTA_KEY)
202+
{
203+
workload_group_timeout = std::cmp::min(queue_timeout, workload_group_timeout);
204+
}
219205

220-
inc_session_running_acquired_queries();
221-
record_session_queue_acquire_duration_ms(start_time.elapsed().unwrap_or_default());
206+
let workload_queue_guard = self
207+
.acquire_workload_queue(
208+
data.clone(),
209+
workload_group.queue_key.clone(),
210+
permits as u64,
211+
workload_group_timeout,
212+
)
213+
.await?;
222214

223-
return Ok(AcquireQueueGuard::create(guards));
215+
info!(
216+
"[QUERY-QUEUE] Successfully acquired from workload group queue. elapsed: {:?}",
217+
instant.elapsed()
218+
);
219+
timeout -= instant.elapsed();
220+
guards.push(workload_queue_guard);
221+
}
224222
}
225223

226-
Ok(AcquireQueueGuard::create(vec![]))
224+
guards.push(self.acquire_warehouse_queue(data, timeout).await?);
225+
226+
inc_session_running_acquired_queries();
227+
record_session_queue_acquire_duration_ms(start_time.elapsed().unwrap_or_default());
228+
229+
Ok(AcquireQueueGuard::create(guards))
227230
}
228231

229232
async fn acquire_workload_queue(
@@ -581,6 +584,9 @@ impl QueryEntry {
581584
| Plan::TruncateTable(_) => {
582585
return true;
583586
}
587+
Plan::CreateTable(v) if v.as_select.is_some() => {
588+
return true;
589+
}
584590
Plan::DropTable(v) if v.all => {
585591
return true;
586592
}

src/query/service/tests/it/sessions/queue_mgr.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,10 @@ async fn test_heavy_actions() -> Result<()> {
382382
// MERGE INTO
383383
add_to_queue: true,
384384
},
385+
Query {
386+
sql: "CREATE TABLE test_heavy_create AS SELECT 1",
387+
add_to_queue: true,
388+
}
385389
];
386390

387391
let fixture = TestFixture::setup().await?;

0 commit comments

Comments
 (0)