Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit dc1bce0

Browse files
committed
Initial working 'State'
1 parent 72d54a9 commit dc1bce0

File tree

3 files changed

+168
-39
lines changed

3 files changed

+168
-39
lines changed

src/query_graph.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use crate::composable_database::{QueryStatus, TaskId};
33
use crate::task::{Task, TaskStatus};
44
use crate::task_queue::TaskQueue;
55
use datafusion::arrow::datatypes::Schema;
6-
use datafusion::common::JoinSide;
76
use datafusion::physical_plan::aggregates::AggregateExec;
87
use datafusion::physical_plan::joins::{
98
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
@@ -15,6 +14,7 @@ use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
1514
use std::collections::HashMap;
1615
use std::sync::atomic::{AtomicU64, Ordering};
1716
use std::sync::Arc;
17+
use std::time::Duration;
1818

1919
// TODO Change to Waiting, Ready, Running(vec[taskid]), Finished(vec[locations?])
2020
#[derive(Clone, Debug, Default)]
@@ -48,10 +48,11 @@ pub struct QueryGraph {
4848
tid_counter: AtomicU64, // TODO: add mutex to stages and make elements pointers to avoid copying
4949
pub stages: Vec<QueryStage>, // Can be a vec since all stages in a query are enumerated from 0.
5050
task_queue: TaskQueue, // Ready tasks in this graph
51+
pub time: Duration,
5152
}
5253

5354
impl QueryGraph {
54-
pub async fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
55+
pub fn new(query_id: u64, plan: Arc<dyn ExecutionPlan>) -> Self {
5556
// Build stages.
5657
let mut builder = GraphBuilder::new();
5758
let stages = builder.build(plan.clone());
@@ -63,6 +64,7 @@ impl QueryGraph {
6364
tid_counter: AtomicU64::new(0),
6465
stages,
6566
task_queue: TaskQueue::new(),
67+
time: Duration::new(0, 0),
6668
};
6769

6870
// Build tasks for leaf stages.
@@ -121,6 +123,7 @@ impl QueryGraph {
121123
let outputs = stage.outputs.clone();
122124

123125
if outputs.is_empty() {
126+
println!("QueryGraph::update_stage_status: Query {} is done.", self.query_id);
124127
self.status = QueryStatus::Done;
125128
return Ok(());
126129
}
@@ -161,6 +164,10 @@ impl QueryGraph {
161164
}
162165

163166
// fn build_tasks(&mut self)
167+
pub fn get_plan(&self, stage_id: u64) -> Arc<dyn ExecutionPlan> {
168+
let plan = self.stages[stage_id as usize].plan.clone();
169+
plan
170+
}
164171
}
165172

166173
#[derive(Clone, Debug)]

src/queue.rs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ use crate::task::{
55
TaskStatus::{self, *},
66
};
77
use crate::SchedulerError;
8-
use std::collections::{BTreeSet, HashMap};
8+
use dashmap::DashMap;
9+
use datafusion::physical_plan::ExecutionPlan;
10+
use datafusion_proto::bytes::physical_plan_to_bytes;
11+
use std::collections::{BTreeMap, BTreeSet, HashMap};
912
use std::hash::{Hash, Hasher};
13+
use std::sync::atomic::{AtomicU64, Ordering};
1014
use std::sync::Arc;
1115
use std::time::{Duration, SystemTime};
12-
use datafusion_proto::bytes::physical_plan_to_bytes;
13-
use tokio::sync::{Mutex, Notify};
16+
use tokio::sync::{Mutex, Notify, RwLock};
1417

1518
// Must implement here since generated TaskId does not derive Hash.
1619
impl Hash for TaskId {
@@ -48,6 +51,114 @@ pub struct Queue {
4851
avail: Arc<Notify>,
4952
}
5053

54+
pub struct State {
55+
// queue: Mutex<VecDeque<QueryKey>>,
56+
queue: Mutex<BTreeMap<Duration, u64>>,
57+
start_ts: SystemTime,
58+
59+
query_id_counter: AtomicU64,
60+
table: DashMap<u64, RwLock<QueryGraph>>,
61+
running_tasks: DashMap<TaskId, Task>,
62+
notify: Arc<Notify>,
63+
}
64+
65+
impl State {
66+
pub fn new(notify: Arc<Notify>) -> Self {
67+
Self {
68+
queue: Mutex::new(BTreeMap::new()),
69+
start_ts: SystemTime::now(),
70+
query_id_counter: AtomicU64::new(0),
71+
table: DashMap::new(),
72+
running_tasks: DashMap::new(),
73+
notify,
74+
}
75+
}
76+
77+
fn next_query_id(&self) -> u64 {
78+
self.query_id_counter.fetch_add(1, Ordering::SeqCst)
79+
}
80+
81+
pub async fn add_query(&self, plan: Arc<dyn ExecutionPlan>) -> u64 {
82+
let id = self.next_query_id();
83+
let mut query = QueryGraph::new(id, plan);
84+
let time = SystemTime::now().duration_since(self.start_ts).unwrap();
85+
query.time = time;
86+
87+
self.table.insert(id, RwLock::new(query));
88+
self.queue.lock().await.insert(time, id);
89+
90+
self.notify.notify_waiters();
91+
id
92+
}
93+
94+
pub async fn get_query_status(&self, query_id: u64) -> Option<QueryStatus> {
95+
let status = self.table.get(&query_id)?.read().await.status;
96+
if status == QueryStatus::Done {
97+
self.table.remove(&query_id);
98+
}
99+
Some(status)
100+
}
101+
102+
pub async fn abort_query(&self, query_id: u64) {
103+
todo!()
104+
}
105+
106+
pub async fn next_task(&self) -> Option<(TaskId, Arc<dyn ExecutionPlan>)> {
107+
let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
108+
return None;
109+
};
110+
let query = self.table.get(&query_id).unwrap();
111+
let mut guard = query.write().await;
112+
113+
let mut task = guard.next_task();
114+
task.status = Running(SystemTime::now());
115+
let task_id = task.task_id;
116+
let plan = guard.get_plan(task.task_id.stage_id);
117+
118+
// Update query to reflect running task. Requeue if more tasks are available.
119+
guard
120+
.update_stage_status(task.task_id.stage_id, StageStatus::Running(0))
121+
.unwrap();
122+
if let QueryQueueStatus::Available = guard.get_queue_status() {
123+
self.queue.lock().await.insert(duration, query_id);
124+
self.notify.notify_waiters();
125+
}
126+
127+
self.running_tasks.insert(task_id, task);
128+
Some((task_id, plan))
129+
}
130+
131+
pub async fn report_task(&self, task_id: TaskId, status: TaskStatus) {
132+
if let Some((_, task)) = self.running_tasks.remove(&task_id) {
133+
println!("Updating {:?} status to {:?}", task_id, status);
134+
let TaskStatus::Running(ts) = task.status else {
135+
println!("Task removed with status {:?}", task.status);
136+
panic!("Task removed but is not running.");
137+
};
138+
let query = self.table.get(&task_id.query_id).unwrap();
139+
let mut guard = query.write().await;
140+
141+
match status {
142+
TaskStatus::Finished => guard
143+
.update_stage_status(task_id.stage_id, StageStatus::Finished(0))
144+
.unwrap(),
145+
TaskStatus::Failed => todo!(),
146+
TaskStatus::Aborted => todo!(),
147+
_ => unreachable!(),
148+
}
149+
150+
let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
151+
let mut queue = self.queue.lock().await;
152+
let _ = queue.remove(&guard.time);
153+
if let QueryQueueStatus::Available = guard.get_queue_status() {
154+
queue.insert(new_time, task_id.query_id);
155+
self.notify.notify_waiters();
156+
}
157+
guard.time = new_time;
158+
}
159+
}
160+
}
161+
51162
// Notify variable is shared with scheduler service to control task dispatch.
52163
impl Queue {
53164
pub fn new(avail: Arc<Notify>) -> Self {

src/server.rs

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,22 @@ use crate::composable_database::{
44
QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet,
55
TaskId,
66
};
7-
use crate::intermediate_results::{get_results, TaskKey};
87
use crate::mock_catalog::load_catalog;
9-
use crate::parser::ExecutionPlanParser;
108
use crate::query_graph::{QueryGraph, StageStatus};
119
use crate::queue::Queue;
10+
use crate::queue::State;
11+
use crate::task::TaskStatus;
1212
use crate::SchedulerError;
13-
use datafusion::arrow::util::pretty::print_batches;
1413
use datafusion::execution::context::SessionContext;
15-
use datafusion_proto::bytes::physical_plan_from_bytes;
14+
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
1615
use std::fmt;
1716
use std::sync::atomic::{AtomicU64, Ordering};
1817
use std::sync::Arc;
1918
use tokio::sync::{Mutex, Notify};
20-
use tokio::time::{sleep, Duration};
21-
use tonic::transport::Server;
2219
use tonic::{Request, Response, Status};
2320

2421
pub struct SchedulerService {
22+
state: Arc<State>,
2523
queue: Arc<Mutex<Queue>>,
2624
ctx: Arc<SessionContext>, // If we support changing the catalog at runtime, this should be a RwLock.
2725
query_id_counter: AtomicU64,
@@ -30,18 +28,15 @@ pub struct SchedulerService {
3028

3129
impl fmt::Debug for SchedulerService {
3230
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33-
write!(
34-
f,
35-
"SchedulerService {{ queue: {:?} }}",
36-
self.queue,
37-
)
31+
write!(f, "SchedulerService {{ queue: {:?} }}", self.queue,)
3832
}
3933
}
4034

4135
impl SchedulerService {
4236
pub async fn new(catalog_path: &str) -> Self {
4337
let avail = Arc::new(Notify::new());
4438
Self {
39+
state: Arc::new(State::new(avail.clone())),
4540
queue: Arc::new(Mutex::new(Queue::new(Arc::clone(&avail)))),
4641
ctx: load_catalog(catalog_path).await,
4742
query_id_counter: AtomicU64::new(0),
@@ -59,19 +54,28 @@ impl SchedulerService {
5954
task_id_opt: Option<TaskId>,
6055
) -> Result<(TaskId, Vec<u8>), SchedulerError> {
6156
if let Some(task_id) = task_id_opt {
62-
let mut queue = self.queue.lock().await;
63-
// Remove the current task from the queue.
64-
queue.remove_task(task_id, StageStatus::Finished(0)).await;
57+
// let mut queue = self.queue.lock().await;
58+
// // Remove the current task from the queue.
59+
// queue.remove_task(task_id, StageStatus::Finished(0)).await;
60+
self.state.report_task(task_id, TaskStatus::Finished).await;
6561
}
6662
loop {
67-
let mut queue = self.queue.lock().await;
68-
if let Some(new_task_id) = queue.next_task().await {
69-
let stage = queue
70-
.get_plan_bytes(new_task_id.query_id, new_task_id.stage_id)
71-
.await?;
72-
return Ok((new_task_id, stage));
63+
// let mut queue = self.queue.lock().await;
64+
// if let Some(new_task_id) = queue.next_task().await {
65+
// let stage = queue
66+
// .get_plan_bytes(new_task_id.query_id, new_task_id.stage_id)
67+
// .await?;
68+
// return Ok((new_task_id, stage));
69+
// }
70+
// drop(queue);
71+
if let Some((task_id, plan)) = self.state.next_task().await {
72+
let bytes = physical_plan_to_bytes(plan)
73+
.expect("Failed to serialize physical plan")
74+
.to_vec();
75+
println!("SchedulerService: Sending task {:?}", task_id);
76+
return Ok((task_id, bytes));
7377
}
74-
drop(queue);
78+
println!("SchedulerService: Waiting for new tasks.");
7579
self.avail.notified().await;
7680
}
7781
}
@@ -94,13 +98,13 @@ impl SchedulerApi for SchedulerService {
9498

9599
let plan = physical_plan_from_bytes(bytes.as_slice(), &self.ctx)
96100
.expect("Failed to deserialize physical plan");
97-
// println!("schedule_query: received plan {:?}", plan);
98101

99-
// Build a query graph, store in query table, enqueue new tasks.
100-
let qid = self.next_query_id();
101-
let query = QueryGraph::new(qid, plan).await;
102-
self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await;
102+
let qid = self.state.add_query(plan).await;
103103

104+
// Build a query graph, store in query table, enqueue new tasks.
105+
// let qid = self.next_query_id();
106+
// let query = QueryGraph::new(qid, plan);
107+
// self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await;
104108

105109
let response = ScheduleQueryRet { query_id: qid };
106110
Ok(Response::new(response))
@@ -113,8 +117,14 @@ impl SchedulerApi for SchedulerService {
113117
) -> Result<Response<QueryJobStatusRet>, Status> {
114118
let QueryJobStatusArgs { query_id } = request.into_inner();
115119

116-
let status = self.queue.lock().await.get_query_status(query_id).await;
120+
let status = self
121+
.state
122+
.get_query_status(query_id)
123+
.await
124+
.unwrap_or(QueryStatus::NotFound);
125+
// let status = self.queue.lock().await.get_query_status(query_id).await;
117126
if status == QueryStatus::Done {
127+
println!("SchedulerService: Query {} is done.", query_id);
118128
let stage_id = 0;
119129
// let final_result = get_results(&TaskKey { stage_id, query_id })
120130
// .await
@@ -127,8 +137,8 @@ impl SchedulerApi for SchedulerService {
127137

128138
return Ok(Response::new(QueryJobStatusRet {
129139
query_status: QueryStatus::Done.into(),
130-
stage_id: stage_id,
131-
query_id: query_id
140+
stage_id,
141+
query_id,
132142
}));
133143
// ****************** END CHANGES FROM INTEGRATION TESTING****************//
134144
}
@@ -145,7 +155,8 @@ impl SchedulerApi for SchedulerService {
145155
) -> Result<Response<AbortQueryRet>, Status> {
146156
// TODO: Actually call executor API to abort query.
147157
let AbortQueryArgs { query_id } = request.into_inner();
148-
self.queue.lock().await.abort_query(query_id).await;
158+
// self.queue.lock().await.abort_query(query_id).await;
159+
self.state.abort_query(query_id).await;
149160
let response = AbortQueryRet { aborted: true };
150161
Ok(Response::new(response))
151162
}
@@ -243,10 +254,10 @@ mod tests {
243254
test_file
244255
);
245256
}
246-
println!(
247-
"test_scheduler: queued {} tasks.",
248-
scheduler_service.queue.lock().await.size()
249-
);
257+
// println!(
258+
// "test_scheduler: queued {} tasks.",
259+
// scheduler_service.queue.lock().await.size()
260+
// );
250261

251262
// TODO: add concurrent test eventually
252263
let mut send_task = NotifyTaskStateArgs {

0 commit comments

Comments
 (0)