Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit 1fd2286

Browse files
committed
Clean up dead code and make tests run
1 parent dc1bce0 commit 1fd2286

File tree

2 files changed

+26
-275
lines changed

2 files changed

+26
-275
lines changed

src/queue.rs

Lines changed: 25 additions & 247 deletions
Original file line numberDiff line numberDiff line change
@@ -30,35 +30,19 @@ impl Eq for TaskId {}
3030

3131
impl Copy for TaskId {}
3232

33-
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
34-
struct QueryKey {
35-
pub ft: Duration,
36-
pub qid: u64,
37-
}
38-
3933
#[derive(Debug)]
40-
pub struct Queue {
41-
// The queue used to order queries by executor usage.
42-
queue: BTreeSet<QueryKey>,
43-
// The startup time of the queue, used to calculate new global passes.
44-
start_ts: SystemTime,
45-
// Structure that maps query IDs to query keys.
46-
query_map: HashMap<u64, (Arc<Mutex<QueryKey>>, Arc<Mutex<QueryGraph>>)>,
47-
// table: DashMap<u64, RwLock<QueryGraph>>,
48-
// List of currently running tasks.
49-
running_task_map: HashMap<TaskId, Task>,
50-
// Notify primitive that signals when new tasks are ready.
51-
avail: Arc<Notify>,
52-
}
53-
5434
pub struct State {
5535
// queue: Mutex<VecDeque<QueryKey>>,
36+
// The queue used to order queries by executor usage.
5637
queue: Mutex<BTreeMap<Duration, u64>>,
5738
start_ts: SystemTime,
5839

5940
query_id_counter: AtomicU64,
41+
// Structure that maps query IDs to query keys.
6042
table: DashMap<u64, RwLock<QueryGraph>>,
43+
// List of currently running tasks.
6144
running_tasks: DashMap<TaskId, Task>,
45+
// Notify primitive that signals when new tasks are ready.
6246
notify: Arc<Notify>,
6347
}
6448

@@ -157,238 +141,48 @@ impl State {
157141
guard.time = new_time;
158142
}
159143
}
160-
}
161-
162-
// Notify variable is shared with scheduler service to control task dispatch.
163-
impl Queue {
164-
pub fn new(avail: Arc<Notify>) -> Self {
165-
Self {
166-
queue: BTreeSet::new(),
167-
start_ts: SystemTime::now(),
168-
query_map: HashMap::new(),
169-
running_task_map: HashMap::new(),
170-
avail,
171-
}
172-
}
173-
174-
// Send the status update for a task to its query graph structure.
175-
// Based on the query's availability, change its priority.
176-
// Used in both add_running_task and remove_task.
177-
async fn update_status(
178-
&mut self,
179-
old_key: QueryKey,
180-
new_key: QueryKey,
181-
finished_stage_id: u64,
182-
finished_stage_status: StageStatus,
183-
) {
184-
// Get the graph for this query
185-
let graph = Arc::clone(&self.query_map.get(&old_key.qid).unwrap().1);
186-
// Temporarily remove query from queue, if present, and get its graph
187-
let _ = self.queue.remove(&old_key);
188-
189-
// If graph has more tasks available, re-insert query and notify
190-
let mut guard = graph.lock().await;
191-
guard
192-
.update_stage_status(finished_stage_id, finished_stage_status)
193-
.unwrap();
194-
if let QueryQueueStatus::Available = guard.get_queue_status() {
195-
self.queue.insert(new_key);
196-
self.avail.notify_waiters();
197-
}
198-
}
199-
200-
// Mark this task as running.
201-
async fn add_running_task(&mut self, mut task: Task, key: QueryKey) {
202-
// Change the task's status to running.
203-
task.status = Running(SystemTime::now());
204-
// Add the task to the list of running tasks.
205-
self.running_task_map.insert(task.task_id, task.clone());
206-
// Send the update to the query graph and reorder queue.
207-
// WARNING: stage_status may not be 'running' if tasks and stages are not 1:1
208-
self.update_status(
209-
key.clone(),
210-
key,
211-
task.task_id.stage_id,
212-
StageStatus::Running(0),
213-
)
214-
.await;
215-
}
216-
217-
/* Get the minimum element of the queue, or None if the queue is empty */
218-
fn min(&mut self) -> Option<QueryKey> {
219-
self.queue.pop_first()
220-
}
221-
222-
#[cfg(test)]
223-
pub fn size(&self) -> usize {
224-
self.queue.len()
225-
}
226-
227-
// TODO(makototomokiyo): make sure stride actually works
228-
pub async fn add_query(&mut self, qid: u64, graph: Arc<Mutex<QueryGraph>>) {
229-
let key = QueryKey {
230-
// running: 0,
231-
ft: SystemTime::now().duration_since(self.start_ts).unwrap(),
232-
qid,
233-
};
234-
self.query_map
235-
.insert(qid, (Arc::new(Mutex::new(key)), Arc::clone(&graph)));
236-
self.queue.insert(key);
237-
self.avail.notify_waiters();
238-
}
239-
240-
/*
241-
Remove this task from the list of running tasks and mark it as done.
242-
This function forwards task info to the task's query graph,
243-
updating it if necessary.
244-
*/
245-
// TODO: handle aborted queries
246-
pub async fn remove_task(&mut self, task_id: TaskId, finished_stage_status: StageStatus) {
247-
// Remove the task from the running map.
248-
let task = self.running_task_map.remove(&task_id).unwrap();
249-
debug_assert!(task.task_id == task_id);
250-
// Get the query ID.
251-
let query = task_id.query_id;
252-
// Get the key corresponding to the task's query.
253-
let mut key = self.query_map.get(&query).unwrap().0.lock().await;
254-
// Ensure the task is running.
255-
if let Running(start_ts) = task.status {
256-
let old_key = *key;
257-
// Increment the query's pass using the task's elapsed time.
258-
(*key).ft += SystemTime::now().duration_since(start_ts).unwrap();
259-
let new_key = *key;
260-
drop(key); // to avoid double mutable borrow
261-
self.update_status(old_key, new_key, task_id.stage_id, finished_stage_status)
262-
.await;
263-
} else {
264-
panic!("Task removed but is not running.");
265-
}
266-
}
267-
268-
/*
269-
Return the next task, or None if the queue is empty.
270-
Blocking is handled in the server.
271-
*/
272-
pub async fn next_task(&mut self) -> Option<TaskId> {
273-
if let Some(key) = self.min() {
274-
// If a query is available, get its next task
275-
let graph = &self.query_map.get(&key.qid).unwrap().1;
276-
println!("Queue size before getting task: {:#?}", self.queue.len());
277-
let new_task = graph.lock().await.next_task();
278144

279-
debug_assert!(matches!(new_task.status, TaskStatus::Ready));
280-
281-
self.add_running_task(new_task.clone(), key).await;
282-
Some(new_task.task_id)
283-
} else {
284-
None
285-
}
286-
}
287-
288-
pub async fn get_query_status(&mut self, qid: u64) -> QueryStatus {
289-
if let Some(query_entry) = self.query_map.get(&qid) {
290-
let status = query_entry.1.lock().await.status;
291-
let key = *query_entry.0.lock().await;
292-
// If query is done, return DONE and delete from table
293-
if status == QueryStatus::Done {
294-
self.query_map.remove(&key.qid).expect("Query not found.");
295-
}
296-
return status;
297-
} else {
298-
return QueryStatus::NotFound;
299-
}
300-
}
301-
302-
pub async fn abort_query(&mut self, qid: u64) {
303-
if let Some(query_entry) = self.query_map.get(&qid) {
304-
query_entry.1.lock().await.abort();
305-
self.query_map.remove(&qid);
306-
}
307-
}
308-
309-
pub async fn get_plan_bytes(
310-
&self,
311-
query_id: u64,
312-
stage_id: u64,
313-
) -> Result<Vec<u8>, SchedulerError> {
314-
let t = &self.query_map;
315-
if let Some((_, graph)) = t.get(&query_id) {
316-
let plan = Arc::clone(&graph.lock().await.stages[stage_id as usize].plan);
317-
Ok(physical_plan_to_bytes(plan)
318-
.expect("Failed to serialize physical plan")
319-
.to_vec())
320-
} else {
321-
Err(SchedulerError::Error("Graph not found.".to_string()))
322-
}
145+
pub async fn size(&self) -> usize {
146+
self.queue.lock().await.len()
323147
}
324148
}
325149

326150
#[cfg(test)]
327151
mod tests {
328152
use rand::Rng;
329153
use std::fs;
330-
use std::time::Duration;
331154
use tokio::sync::{Mutex, Notify};
332-
use tokio::time::sleep;
333155

334156
use crate::parser::ExecutionPlanParser;
335-
use crate::{
336-
composable_database::TaskId,
337-
query_graph::{QueryGraph, StageStatus},
338-
queue::{QueryKey, Queue},
339-
};
340-
use std::{
341-
cmp::min,
342-
sync::{
343-
atomic::{AtomicU64, Ordering},
344-
Arc,
345-
},
346-
time::SystemTime,
347-
};
348-
349-
// Test that query keys compare properly.
350-
#[tokio::test]
351-
async fn test_query_key_cmp() {
352-
let then = SystemTime::now();
353-
let now1 = SystemTime::now().duration_since(then).unwrap();
354-
sleep(Duration::from_secs(1)).await;
355-
let now2 = SystemTime::now().duration_since(then).unwrap();
356-
357-
let key1 = QueryKey {
358-
ft: now1.clone(),
359-
qid: 1,
360-
};
361-
let key2 = QueryKey { ft: now2, qid: 0 };
362-
let key3 = QueryKey { ft: now1, qid: 0 };
363-
// Make sure durations are compared first
364-
assert!(key1 < key2);
365-
// Then qids
366-
assert!(key3 < key1);
367-
}
157+
use crate::queue::State;
158+
use crate::task::TaskStatus;
159+
use std::{cmp::min, sync::Arc};
368160

369161
// Deprecated, use test_queue_conc instead
370162
#[tokio::test]
371163
async fn test_queue() {
372-
let test_file = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/expr.slt");
164+
let test_file = concat!(
165+
env!("CARGO_MANIFEST_DIR"),
166+
"/test_sql/test_select_multiple.sql"
167+
);
373168
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/");
374-
let mut queue = Box::new(Queue::new(Arc::new(Notify::new())));
169+
let queue = Box::new(State::new(Arc::new(Notify::new())));
375170
let parser = ExecutionPlanParser::new(catalog_path).await;
376171
println!("test_scheduler: Testing file {}", test_file);
377172
if let Ok(physical_plans) = parser.get_execution_plan_from_file(&test_file).await {
378173
let mut qid = 0;
379174
// Add a bunch of queries
380175
for plan in &physical_plans {
381-
let graph = QueryGraph::new(qid, Arc::clone(plan)).await;
382-
queue.add_query(qid, Arc::new(Mutex::new(graph))).await;
176+
queue.add_query(Arc::clone(plan)).await;
383177
qid += 1;
384178
}
385-
let mut tasks: Vec<TaskId> = Vec::new();
179+
let mut tasks = Vec::new();
386180
for _ in 0..qid {
387181
tasks.push(queue.next_task().await.expect("No tasks left in queue."));
388182
}
389183
for _ in 0..qid {
390184
queue
391-
.remove_task(tasks.pop().unwrap(), StageStatus::Finished(0))
185+
.report_task(tasks.pop().unwrap().0, TaskStatus::Finished)
392186
.await;
393187
}
394188
} else {
@@ -400,7 +194,7 @@ mod tests {
400194
async fn test_queue_conc() {
401195
let test_sql_dir = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/");
402196
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/");
403-
let queue = Arc::new(Mutex::new(Queue::new(Arc::new(Notify::new()))));
197+
let queue = Arc::new(State::new(Arc::new(Notify::new())));
404198
let parser = ExecutionPlanParser::new(catalog_path).await;
405199
println!("test_queue_conc: Testing files in {}", test_sql_dir);
406200

@@ -411,29 +205,17 @@ mod tests {
411205
physical_plans.extend(parser.get_execution_plan_from_file(&path).await.unwrap());
412206
}
413207

414-
// let physical_plans = parser
415-
// .get_execution_plan_from_file(&test_file)
416-
// .await
417-
// .unwrap();
418208
println!("Got {} plans.", physical_plans.len());
419209
let nplans = min(physical_plans.len(), 400);
420210
let plans = physical_plans[..nplans].to_vec();
421-
let qid: Arc<Mutex<AtomicU64>> = Arc::new(Mutex::new(AtomicU64::new(0)));
422211

423212
// Add a bunch of queries
424213
let mut jobs = Vec::new();
425214
for plan in plans {
426215
let queue_clone = Arc::clone(&queue);
427-
let qid_clone = Arc::clone(&qid);
428216
// Spawn threads that each enqueue a job
429217
jobs.push(tokio::spawn(async move {
430-
let query_id = qid_clone.lock().await.fetch_add(1, Ordering::SeqCst);
431-
let graph = QueryGraph::new(query_id, Arc::clone(&plan)).await;
432-
queue_clone
433-
.lock()
434-
.await
435-
.add_query(query_id, Arc::new(Mutex::new(graph)))
436-
.await;
218+
let _ = queue_clone.add_query(Arc::clone(&plan)).await;
437219
}));
438220
}
439221

@@ -444,18 +226,14 @@ mod tests {
444226
jobs.push(tokio::spawn(async move {
445227
// Get a plan, looping until one is available
446228
loop {
447-
let task_opt = queue_clone.lock().await.next_task().await;
448-
if let Some(task) = task_opt {
449-
queue_clone
450-
.lock()
451-
.await
452-
.remove_task(task, StageStatus::Finished(0))
453-
.await;
229+
let task_opt = queue_clone.next_task().await;
230+
if let Some((task, _plan)) = task_opt {
231+
queue_clone.report_task(task, TaskStatus::Finished).await;
454232
}
455233
let time = rand::thread_rng().gen_range(200..4000);
456234
tokio::time::sleep(tokio::time::Duration::from_millis(time)).await;
457235
// Return if no more queries left.
458-
if queue_clone.lock().await.queue.len() == 0 {
236+
if queue_clone.size().await == 0 {
459237
return;
460238
}
461239
}
@@ -468,8 +246,8 @@ mod tests {
468246
}
469247
// println!("Queued {} queries.", qid.lock().await.load(Ordering::SeqCst));
470248
// make sure no more tasks remain
471-
assert!(Arc::clone(&queue).lock().await.next_task().await.is_none());
472-
assert!(queue.lock().await.size() == 0);
249+
assert!(Arc::clone(&queue).next_task().await.is_none());
250+
assert!(queue.size().await == 0);
473251
println!("Finished {:?} tasks.", nplans);
474252
}
475253
}

0 commit comments

Comments
 (0)