Skip to content

Commit e5fdd21

Browse files
authored
fix(query): Support the release of the query queue after the client cancels the query (#18558)
* fix(query): Support the release of the query queue after the client cancels the query * fix(query): Support the release of the query queue after the client cancels the query * Update interpreter.rs
1 parent cf6ddba commit e5fdd21

File tree

8 files changed

+274
-19
lines changed

8 files changed

+274
-19
lines changed

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::time::SystemTime;
2424
use dashmap::DashMap;
2525
use databend_common_base::base::Progress;
2626
use databend_common_base::base::ProgressValues;
27+
use databend_common_base::base::WatchNotify;
2728
use databend_common_base::runtime::ExecutorStatsSnapshot;
2829
use databend_common_exception::ErrorCode;
2930
use databend_common_exception::Result;
@@ -199,6 +200,7 @@ pub trait TableContext: Send + Sync {
199200
fn get_id(&self) -> String;
200201
fn get_current_catalog(&self) -> String;
201202
fn check_aborting(&self) -> Result<(), ContextError>;
203+
fn get_abort_notify(&self) -> Arc<WatchNotify>;
202204
fn get_abort_checker(self: Arc<Self>) -> AbortChecker
203205
where Self: 'static {
204206
struct Checker<S> {

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ impl Executor {
282282
query_id = guard.query_id, reason:? = reason;
283283
"[HTTP-QUERY] Query state transitioning from Starting to Stopped"
284284
);
285+
286+
s.ctx.get_abort_notify().notify_waiters();
285287
if let Err(e) = &reason {
286288
InterpreterQueryLog::log_finish(
287289
&s.ctx,

src/query/service/src/sessions/query_ctx.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use dashmap::DashMap;
3636
use databend_common_base::base::Progress;
3737
use databend_common_base::base::ProgressValues;
3838
use databend_common_base::base::SpillProgress;
39+
use databend_common_base::base::WatchNotify;
3940
use databend_common_base::runtime::profile::Profile;
4041
use databend_common_base::runtime::profile::ProfileStatisticsName;
4142
use databend_common_base::runtime::ExecutorStatsSnapshot;
@@ -888,6 +889,10 @@ impl TableContext for QueryContext {
888889
self.shared.check_aborting()
889890
}
890891

892+
fn get_abort_notify(&self) -> Arc<WatchNotify> {
893+
self.shared.abort_notify.clone()
894+
}
895+
891896
fn get_error(&self) -> Option<ErrorCode<ContextError>> {
892897
self.shared.get_error()
893898
}

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use dashmap::DashMap;
3030
use databend_common_base::base::short_sql;
3131
use databend_common_base::base::Progress;
3232
use databend_common_base::base::SpillProgress;
33+
use databend_common_base::base::WatchNotify;
3334
use databend_common_base::runtime::drop_guard;
3435
use databend_common_base::runtime::ExecutorStatsSnapshot;
3536
use databend_common_base::runtime::MemStat;
@@ -117,6 +118,7 @@ pub struct QueryContextShared {
117118
pub(in crate::sessions) running_query_text_hash: Arc<RwLock<Option<String>>>,
118119
pub(in crate::sessions) running_query_parameterized_hash: Arc<RwLock<Option<String>>>,
119120
pub(in crate::sessions) aborting: Arc<AtomicBool>,
121+
pub(in crate::sessions) abort_notify: Arc<WatchNotify>,
120122
pub(in crate::sessions) tables_refs: Arc<Mutex<HashMap<DatabaseAndTable, Arc<dyn Table>>>>,
121123
pub(in crate::sessions) streams_refs: Arc<RwLock<HashMap<DatabaseAndTable, bool>>>,
122124
pub(in crate::sessions) affect: Arc<Mutex<Option<QueryAffect>>>,
@@ -221,6 +223,7 @@ impl QueryContextShared {
221223
running_query_text_hash: Arc::new(RwLock::new(None)),
222224
running_query_parameterized_hash: Arc::new(RwLock::new(None)),
223225
aborting: Arc::new(AtomicBool::new(false)),
226+
abort_notify: Arc::new(WatchNotify::new()),
224227
tables_refs: Arc::new(Mutex::new(HashMap::new())),
225228
streams_refs: Default::default(),
226229
affect: Arc::new(Mutex::new(None)),
@@ -368,7 +371,7 @@ impl QueryContextShared {
368371
}
369372

370373
self.aborting.store(true, Ordering::Release);
371-
374+
self.abort_notify.notify_waiters();
372375
// TODO: Wait for the query to be processed (write out the last error)
373376
}
374377

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::time::SystemTime;
3030
use databend_common_ast::ast::ExplainKind;
3131
use databend_common_base::base::escape_for_key;
3232
use databend_common_base::base::GlobalInstance;
33+
use databend_common_base::base::WatchNotify;
3334
use databend_common_base::runtime::workload_group::QuotaValue;
3435
use databend_common_base::runtime::workload_group::MAX_CONCURRENCY_QUOTA_KEY;
3536
use databend_common_base::runtime::workload_group::QUERY_QUEUED_TIMEOUT_QUOTA_KEY;
@@ -54,6 +55,7 @@ use databend_common_sql::plans::ModifyColumnAction;
5455
use databend_common_sql::plans::ModifyTableColumnPlan;
5556
use databend_common_sql::plans::Plan;
5657
use databend_common_sql::PlanExtras;
58+
use futures_util::future::Either;
5759
use log::info;
5860
use parking_lot::Mutex;
5961
use pin_project_lite::pin_project;
@@ -82,6 +84,8 @@ pub trait QueueData: Send + Sync + 'static {
8284
fn enter_wait_pending(&self) {}
8385

8486
fn exit_wait_pending(&self, _wait_time: Duration) {}
87+
88+
fn get_abort_notify(&self) -> Arc<WatchNotify>;
8589
}
8690

8791
pub(crate) struct Inner<Data: QueueData> {
@@ -174,6 +178,20 @@ impl<Data: QueueData> QueueManager<Data> {
174178
}
175179

176180
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
181+
let abort_notify = data.get_abort_notify();
182+
183+
let watch_abort_notify = Box::pin(async move { abort_notify.notified().await });
184+
185+
let acquire = Box::pin(self.acquire_inner(data));
186+
match futures::future::select(acquire, watch_abort_notify).await {
187+
Either::Left((left, _)) => left,
188+
Either::Right((_, _)) => Err(ErrorCode::AbortedQuery(
189+
"[QUERY-QUEUE] recv query abort notify.",
190+
)),
191+
}
192+
}
193+
194+
async fn acquire_inner(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
177195
if !data.need_acquire_to_queue() {
178196
info!("[QUERY-QUEUE] Non-heavy queries skip the query queue and execute directly.");
179197
return Ok(AcquireQueueGuard::create(vec![]));
@@ -490,6 +508,7 @@ pub struct QueryEntry {
490508
pub timeout: Duration,
491509
pub lock_ttl: Duration,
492510
pub need_acquire_to_queue: bool,
511+
pub abort_watch_notify: Arc<WatchNotify>,
493512
}
494513

495514
impl QueryEntry {
@@ -511,6 +530,7 @@ impl QueryEntry {
511530
timeout => Duration::from_secs(timeout),
512531
},
513532
lock_ttl: Duration::from_secs(settings.get_statement_queue_ttl_in_seconds()?),
533+
abort_watch_notify: ctx.get_abort_notify(),
514534
})
515535
}
516536

@@ -668,6 +688,10 @@ impl QueueData for QueryEntry {
668688
);
669689
self.ctx.set_query_queued_duration(wait_time)
670690
}
691+
692+
fn get_abort_notify(&self) -> Arc<WatchNotify> {
693+
self.abort_watch_notify.clone()
694+
}
671695
}
672696

673697
pub type QueriesQueueManager = QueueManager<QueryEntry>;

0 commit comments

Comments
 (0)