Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit 5dafdf5

Browse files
committed
remove warnings
1 parent 53f02b3 commit 5dafdf5

File tree

11 files changed

+31
-42
lines changed

11 files changed

+31
-42
lines changed

src/bin/mock.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ async fn main() {
122122

123123
const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
124124
const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
125-
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");
126125
const POLL_INTERVAL: Duration = Duration::from_millis(100);
127126

128127
// creates server, executors, and the frontend
@@ -165,7 +164,6 @@ pub async fn run_single_query(
165164
drop(frontend_lock);
166165
return Ok(());
167166
}
168-
unreachable!();
169167
}
170168

171169
async fn interactive_mode() {

src/executor_client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@
3232
3333
use crate::composable_database::scheduler_api_client::SchedulerApiClient;
3434
use crate::composable_database::QueryStatus::InProgress;
35-
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId};
35+
use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus};
3636
use crate::frontend::JobInfo;
3737
use crate::intermediate_results::{insert_results, rewrite_query, TaskKey};
3838
use crate::mock_catalog::load_catalog;
3939
use crate::mock_executor::MockExecutor;
4040
use chrono::Utc;
4141
use datafusion::execution::context::SessionContext;
42-
use datafusion::physical_plan::ExecutionPlan;
4342
use datafusion_proto::bytes::physical_plan_from_bytes;
4443
use std::path::Path;
4544
use std::path::PathBuf;

src/frontend.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl MockFrontend {
230230
let existing_value = self.jobs.insert(
231231
query_id,
232232
JobInfo {
233-
query_id: query_id,
233+
query_id,
234234
submitted_at: Utc::now(),
235235
sql_string: sql_string.to_string(),
236236
result: None,
@@ -280,7 +280,7 @@ impl MockFrontend {
280280
// eprintln!("Polling!");
281281
assert!(self.scheduler_api_client.is_some());
282282

283-
let mut client = self.scheduler_api_client.as_mut().unwrap();
283+
let client = self.scheduler_api_client.as_mut().unwrap();
284284

285285
let keys: Vec<u64> = self.jobs.keys().cloned().collect();
286286
for query_id in keys {

src/integration_test.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@ use crate::server::SchedulerService;
88
use datafusion::arrow::array::RecordBatch;
99
use datafusion::error::DataFusionError;
1010
use datafusion::logical_expr::{col, Expr};
11-
use datafusion::prelude::{concat, SessionContext};
12-
use serde::{Deserialize, Serialize};
11+
use datafusion::prelude::SessionContext;
1312
use std::sync::Arc;
1413
use tokio::sync::Mutex;
1514
use tonic::transport::Server;
1615

1716
pub struct IntegrationTest {
1817
catalog_path: String,
19-
config_path: String,
2018
ctx: Arc<SessionContext>,
2119
config: Config,
2220
pub frontend: Arc<Mutex<MockFrontend>>,
2321
}
2422

25-
const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
2623
pub const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
2724
const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs");
2825

@@ -37,7 +34,6 @@ impl IntegrationTest {
3734
ctx,
3835
config,
3936
catalog_path,
40-
config_path,
4137
frontend: Arc::new(Mutex::new(frontend)),
4238
}
4339
}
@@ -195,20 +191,22 @@ impl IntegrationTest {
195191
mod tests {
196192
use crate::integration_test::IntegrationTest;
197193
use crate::parser::ExecutionPlanParser;
198-
// use crate::CATALOG_PATH;
199194
use super::*;
200195
use datafusion::arrow::array::{Int32Array, RecordBatch};
201196
use datafusion::arrow::datatypes::{DataType, Field, Schema};
202197
use std::path::PathBuf;
203198
use std::sync::Arc;
204199
use tokio::fs;
205200

201+
const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
202+
206203
async fn initialize_integration_test() -> IntegrationTest {
207204
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data");
208-
let config_path = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml");
205+
let config_path = CONFIG_PATH;
209206
IntegrationTest::new(catalog_path.to_string(), config_path.to_string()).await
210207
}
211208

209+
#[allow(dead_code)]
212210
pub async fn get_all_tpch_queries_test() -> Vec<String> {
213211
let parser = ExecutionPlanParser::new(CATALOG_PATH).await;
214212
let mut res = Vec::new();

src/mock_optimizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
use crate::mock_catalog::load_catalog;
1818
use datafusion::error::DataFusionError;
19-
use datafusion::execution::context::{SessionContext, SessionState};
19+
use datafusion::execution::context::SessionContext;
2020
use datafusion::logical_expr::LogicalPlan;
2121
use datafusion::physical_plan::ExecutionPlan;
2222
use std::sync::Arc;

src/parser.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,14 @@
2626
use crate::mock_catalog::load_catalog;
2727
use datafusion::{
2828
arrow::{
29-
array::{RecordBatch, RecordBatchReader},
29+
array::RecordBatch,
3030
ipc::{reader::FileReader, writer::FileWriter},
3131
},
3232
error::{DataFusionError, Result},
3333
execution::context::SessionContext,
3434
physical_plan::ExecutionPlan,
35-
physical_planner::PhysicalPlanner,
3635
};
3736
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
38-
use futures::TryFutureExt;
3937
use sqlparser::{dialect::GenericDialect, parser::Parser};
4038
use std::{fmt, io::Cursor, sync::Arc};
4139
use tokio::{fs::File, io::AsyncReadExt};

src/query_graph.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
#![allow(dead_code)]
2-
use crate::composable_database::{QueryStatus, TaskId};
3-
use crate::task::{Task, TaskStatus};
2+
use crate::composable_database::QueryStatus;
3+
use crate::task::Task;
44
use crate::task_queue::TaskQueue;
55
use datafusion::arrow::datatypes::Schema;
6-
use datafusion::physical_plan::aggregates::AggregateExec;
7-
use datafusion::physical_plan::joins::{
8-
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
9-
};
10-
use datafusion::physical_plan::limit::GlobalLimitExec;
6+
// use datafusion::physical_plan::joins::{
7+
// use datafusion::physical_plan::aggregates::AggregateExec;
8+
// CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec,
9+
// };
10+
// use datafusion::physical_plan::limit::GlobalLimitExec;
11+
// use datafusion::physical_plan::sorts::sort::SortExec;
1112
use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
12-
use datafusion::physical_plan::sorts::sort::SortExec;
1313
use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
1414
use std::collections::HashMap;
1515
use std::sync::atomic::{AtomicU64, Ordering};

src/queue.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,15 @@ impl State {
8181
Some(status)
8282
}
8383

84+
// TODO: Graceful abort.
8485
pub async fn abort_query(&self, query_id: u64) {
85-
todo!()
86+
if let Some(query) = self.table.get(&query_id) {
87+
{
88+
let mut guard = query.write().await;
89+
guard.abort();
90+
}
91+
self.table.remove(&query_id);
92+
}
8693
}
8794

8895
pub async fn next_task(&self) -> Option<(TaskId, Arc<dyn ExecutionPlan>)> {
@@ -140,6 +147,7 @@ impl State {
140147
}
141148
}
142149

150+
#[allow(dead_code)]
143151
pub async fn size(&self) -> usize {
144152
self.queue.lock().await.len()
145153
}
@@ -149,7 +157,7 @@ impl State {
149157
mod tests {
150158
use rand::Rng;
151159
use std::{fs, time::{Duration, SystemTime}};
152-
use tokio::{sync::{Mutex, Notify}, time::sleep};
160+
use tokio::{sync::Notify, time::sleep};
153161

154162
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
155163
use crate::queue::State;

src/server.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
1-
use crate::composable_database::scheduler_api_server::{SchedulerApi, SchedulerApiServer};
1+
use crate::composable_database::scheduler_api_server::SchedulerApi;
22
use crate::composable_database::{
33
AbortQueryArgs, AbortQueryRet, NotifyTaskStateArgs, NotifyTaskStateRet, QueryInfo,
44
QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet,
55
TaskId,
66
};
77
use crate::mock_catalog::load_catalog;
8-
use crate::query_graph::{QueryGraph, StageStatus};
98
use crate::queue::State;
109
use crate::task::TaskStatus;
1110
use crate::SchedulerError;
1211
use datafusion::execution::context::SessionContext;
1312
use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes};
1413
use std::fmt;
15-
use std::sync::atomic::{AtomicU64, Ordering};
1614
use std::sync::Arc;
17-
use tokio::sync::{Mutex, Notify};
15+
use tokio::sync::Notify;
1816
use tonic::{Request, Response, Status};
1917

2018
pub struct SchedulerService {

src/task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::SystemTime;
33

44
// TODO: some of these don't do anything since
55
// the task is only created when it is ready
6+
#[allow(dead_code)]
67
#[derive(Debug, Clone)]
78
pub enum TaskStatus {
89
Ready,

0 commit comments

Comments
 (0)