Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit 8936b1e

Browse files
committed
try fifo query queue
1 parent 5dafdf5 commit 8936b1e

File tree

1 file changed

+26
-19
lines changed

1 file changed

+26
-19
lines changed

src/queue.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ use crate::task::{
66
};
77
use dashmap::DashMap;
88
use datafusion::physical_plan::ExecutionPlan;
9-
use std::collections::BTreeMap;
9+
use std::collections::VecDeque;
1010
use std::hash::{Hash, Hasher};
1111
use std::sync::atomic::{AtomicU64, Ordering};
1212
use std::sync::Arc;
13-
use std::time::{Duration, SystemTime};
13+
use std::time::SystemTime;
1414
use tokio::sync::{Mutex, Notify, RwLock};
1515

1616
// Must implement here since generated TaskId does not derive Hash.
@@ -30,9 +30,9 @@ impl Copy for TaskId {}
3030

3131
#[derive(Debug)]
3232
pub struct State {
33-
// queue: Mutex<VecDeque<QueryKey>>,
3433
// The queue used to order queries by executor usage.
35-
queue: Mutex<BTreeMap<Duration, u64>>,
34+
// queue: Mutex<BTreeMap<Duration, u64>>,
35+
queue: Mutex<VecDeque<u64>>,
3636
start_ts: SystemTime,
3737

3838
query_id_counter: AtomicU64,
@@ -47,7 +47,8 @@ pub struct State {
4747
impl State {
4848
pub fn new(notify: Arc<Notify>) -> Self {
4949
Self {
50-
queue: Mutex::new(BTreeMap::new()),
50+
// queue: Mutex::new(BTreeMap::new()),
51+
queue: Mutex::new(VecDeque::new()),
5152
start_ts: SystemTime::now(),
5253
query_id_counter: AtomicU64::new(0),
5354
table: DashMap::new(),
@@ -67,7 +68,8 @@ impl State {
6768
query.time = time;
6869

6970
self.table.insert(id, RwLock::new(query));
70-
self.queue.lock().await.insert(time, id);
71+
// self.queue.lock().await.insert(time, id);
72+
self.queue.lock().await.push_back(id);
7173

7274
self.notify.notify_waiters();
7375
id
@@ -93,7 +95,8 @@ impl State {
9395
}
9496

9597
pub async fn next_task(&self) -> Option<(TaskId, Arc<dyn ExecutionPlan>)> {
96-
let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
98+
// let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
99+
let Some(query_id) = self.queue.lock().await.pop_front() else {
97100
return None;
98101
};
99102
let query = self.table.get(&query_id).unwrap();
@@ -109,7 +112,8 @@ impl State {
109112
.update_stage_status(task.task_id.stage_id, StageStatus::Running(0))
110113
.unwrap();
111114
if let QueryQueueStatus::Available = guard.get_queue_status() {
112-
self.queue.lock().await.insert(duration, query_id);
115+
// self.queue.lock().await.insert(duration, query_id);
116+
self.queue.lock().await.push_back(query_id);
113117
self.notify.notify_waiters();
114118
}
115119

@@ -120,7 +124,7 @@ impl State {
120124
pub async fn report_task(&self, task_id: TaskId, status: TaskStatus) {
121125
if let Some((_, task)) = self.running_tasks.remove(&task_id) {
122126
println!("Updating {:?} status to {:?}", task_id, status);
123-
let TaskStatus::Running(ts) = task.status else {
127+
let TaskStatus::Running(_ts) = task.status else {
124128
println!("Task removed with status {:?}", task.status);
125129
panic!("Task removed but is not running.");
126130
};
@@ -136,14 +140,19 @@ impl State {
136140
_ => unreachable!(),
137141
}
138142

139-
let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
140143
let mut queue = self.queue.lock().await;
141-
let _ = queue.remove(&guard.time);
142-
if let QueryQueueStatus::Available = guard.get_queue_status() {
143-
queue.insert(new_time, task_id.query_id);
144-
self.notify.notify_waiters();
144+
// let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
145+
// let _ = queue.remove(&guard.time);
146+
// if let QueryQueueStatus::Available = guard.get_queue_status() {
147+
// queue.insert(new_time, task_id.query_id);
148+
// self.notify.notify_waiters();
149+
// }
150+
// guard.time = new_time;
151+
if QueryQueueStatus::Available == guard.get_queue_status()
152+
&& !queue.contains(&task_id.query_id)
153+
{
154+
queue.push_back(task_id.query_id);
145155
}
146-
guard.time = new_time;
147156
}
148157
}
149158

@@ -159,9 +168,9 @@ mod tests {
159168
use std::{fs, time::{Duration, SystemTime}};
160169
use tokio::{sync::Notify, time::sleep};
161170

162-
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
163171
use crate::queue::State;
164172
use crate::task::TaskStatus;
173+
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
165174
use std::{cmp::min, sync::Arc};
166175

167176
// Deprecated, use test_queue_conc instead
@@ -290,9 +299,7 @@ mod tests {
290299
// Add a bunch of queries with staggered submission time
291300
let start_enqueue = SystemTime::now();
292301
for plan in long_plans {
293-
queue
294-
.add_query(Arc::clone(&plan))
295-
.await;
302+
queue.add_query(Arc::clone(&plan)).await;
296303
sleep(Duration::from_millis(10)).await;
297304
}
298305
let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();

0 commit comments

Comments
 (0)