diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index 785d3b0cb..855d45786 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -105,26 +105,6 @@ impl DistributedQueryExec { } } - pub fn with_repr( - scheduler_url: String, - config: BallistaConfig, - plan: LogicalPlan, - extension_codec: Arc, - plan_repr: PhantomData, - session_id: String, - ) -> Self { - let properties = Self::compute_properties(plan.schema().as_ref().clone().into()); - Self { - scheduler_url, - config, - plan, - extension_codec, - plan_repr, - session_id, - properties, - } - } - fn compute_properties(schema: SchemaRef) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 25bdbad92..1ced35e4b 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -19,9 +19,9 @@ use crate::config::{ BallistaConfig, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, BALLISTA_STANDALONE_PARALLELISM, }; +use crate::planner::BallistaQueryPlanner; use crate::serde::protobuf::KeyValuePair; use crate::serde::{BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec}; -use crate::utils::BallistaQueryPlanner; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 7864d56ec..5d55b9ec9 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -34,6 +34,7 @@ pub mod error; pub mod event_loop; pub mod execution_plans; pub mod extension; +pub mod planner; pub mod registry; pub mod serde; pub mod utils; diff --git a/ballista/core/src/planner.rs b/ballista/core/src/planner.rs new file mode 100644 index 000000000..75c659666 --- /dev/null +++ b/ballista/core/src/planner.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::BallistaConfig; +use crate::execution_plans::DistributedQueryExec; +use crate::serde::BallistaLogicalExtensionCodec; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::Schema; +use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; +use datafusion::error::DataFusionError; +use datafusion::execution::context::{QueryPlanner, SessionState}; +use datafusion::logical_expr::{LogicalPlan, TableScan}; +use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; +use std::marker::PhantomData; +use std::sync::Arc; + +/// [BallistaQueryPlanner] planner takes logical plan +/// and executes it remotely on on scheduler. +/// +/// Under the hood it will create [DistributedQueryExec] +/// which will establish gprc connection with the scheduler. +/// +pub struct BallistaQueryPlanner { + scheduler_url: String, + config: BallistaConfig, + extension_codec: Arc, + local_planner: DefaultPhysicalPlanner, + _plan_type: PhantomData, +} + +impl std::fmt::Debug for BallistaQueryPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BallistaQueryPlanner") + .field("scheduler_url", &self.scheduler_url) + .field("config", &self.config) + .field("extension_codec", &self.extension_codec) + .field("_plan_type", &self._plan_type) + .finish() + } +} + +impl BallistaQueryPlanner { + pub fn new(scheduler_url: String, config: BallistaConfig) -> Self { + Self { + scheduler_url, + config, + extension_codec: Arc::new(BallistaLogicalExtensionCodec::default()), + local_planner: DefaultPhysicalPlanner::default(), + _plan_type: PhantomData, + } + } + + pub fn with_extension( + scheduler_url: String, + config: BallistaConfig, + extension_codec: Arc, + ) -> Self { + Self { + scheduler_url, + config, + extension_codec, + local_planner: DefaultPhysicalPlanner::default(), + _plan_type: PhantomData, + } + } + + pub fn with_local_planner( + scheduler_url: String, + config: BallistaConfig, + extension_codec: Arc, + local_planner: DefaultPhysicalPlanner, + ) -> Self { + Self { + scheduler_url, + config, + extension_codec, + _plan_type: PhantomData, + local_planner, + } + } +} + +#[async_trait] +impl QueryPlanner for BallistaQueryPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> std::result::Result, DataFusionError> { + log::debug!("create_physical_plan - plan: {:?}", logical_plan); + // we inspect if plan scans local tables only, + // like tables located in information_schema, + // if that is the case, we run that plan + // on this same context, not on cluster + let mut local_run = LocalRun::default(); + let _ = logical_plan.visit(&mut local_run); + + if local_run.can_be_local { + log::debug!("create_physical_plan - plan can be executed locally"); + + self.local_planner + .create_physical_plan(logical_plan, session_state) + .await + } else { + match logical_plan { + LogicalPlan::EmptyRelation(_) => { + log::debug!("create_physical_plan - handling empty exec"); + Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) + } + _ => { + log::debug!("create_physical_plan - handling general statement"); + + Ok(Arc::new(DistributedQueryExec::::with_extension( + self.scheduler_url.clone(), + self.config.clone(), + logical_plan.clone(), + self.extension_codec.clone(), + session_state.session_id().to_string(), + ))) + } + } + } + } +} + +/// A Visitor which detect if query is using local tables, +/// such as tables located in `information_schema` and returns true +/// only if all scans are in from local tables +#[derive(Debug, Default)] +struct LocalRun { + can_be_local: bool, +} + +impl<'n> TreeNodeVisitor<'n> for LocalRun { + type Node = LogicalPlan; + + fn f_down( + &mut self, + node: &'n Self::Node, + ) -> datafusion::error::Result { + match node { + LogicalPlan::TableScan(TableScan { table_name, .. }) => match table_name { + datafusion::sql::TableReference::Partial { schema, .. } + | datafusion::sql::TableReference::Full { schema, .. } + if schema.as_ref() == "information_schema" => + { + self.can_be_local = true; + Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue) + } + _ => { + self.can_be_local = false; + Ok(datafusion::common::tree_node::TreeNodeRecursion::Stop) + } + }, + _ => Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue), + } + } +} + +#[cfg(test)] +mod test { + use datafusion::{ + common::tree_node::TreeNode, + error::Result, + execution::{ + runtime_env::{RuntimeConfig, RuntimeEnv}, + SessionStateBuilder, + }, + prelude::{SessionConfig, SessionContext}, + }; + + use super::LocalRun; + + fn context() -> SessionContext { + let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(); + + let session_config = SessionConfig::new().with_information_schema(true); + + let state = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(runtime_environment.into()) + .with_default_features() + .build(); + + SessionContext::new_with_state(state) + } + + #[tokio::test] + async fn should_detect_show_table_as_local_plan() -> Result<()> { + let ctx = context(); + let df = ctx.sql("SHOW TABLES").await?; + let lp = df.logical_plan(); + let mut local_run = LocalRun::default(); + + lp.visit(&mut local_run).unwrap(); + + assert!(local_run.can_be_local); + + Ok(()) + } + + #[tokio::test] + async fn should_detect_select_from_information_schema_as_local_plan() -> Result<()> { + let ctx = context(); + let df = ctx.sql("SELECT * FROM information_schema.df_settings WHERE NAME LIKE 'ballista%'").await?; + let lp = df.logical_plan(); + let mut local_run = LocalRun::default(); + + lp.visit(&mut local_run).unwrap(); + + assert!(local_run.can_be_local); + + Ok(()) + } + + #[tokio::test] + async fn should_not_detect_local_table() -> Result<()> { + let ctx = context(); + ctx.sql("CREATE TABLE tt (c0 INT, c1 INT)") + .await? + .show() + .await?; + let df = ctx.sql("SELECT * FROM tt").await?; + let lp = df.logical_plan(); + let mut local_run = LocalRun::default(); + + lp.visit(&mut local_run).unwrap(); + + assert!(!local_run.can_be_local); + + Ok(()) + } + + #[tokio::test] + async fn should_not_detect_external_table() -> Result<()> { + let ctx = context(); + ctx.register_csv("tt", "tests/customer.csv", Default::default()) + .await?; + let df = ctx.sql("SELECT * FROM tt").await?; + let lp = df.logical_plan(); + let mut local_run = LocalRun::default(); + + lp.visit(&mut local_run).unwrap(); + + assert!(!local_run.can_be_local); + + Ok(()) + } +} diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 913e955d3..55d4a9951 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -15,34 +15,21 @@ // specific language governing permissions and limitations // under the License. -use crate::config::BallistaConfig; use crate::error::{BallistaError, Result}; -use crate::execution_plans::DistributedQueryExec; - use crate::extension::SessionConfigExt; use crate::serde::scheduler::PartitionStats; -use crate::serde::BallistaLogicalExtensionCodec; -use async_trait::async_trait; -use datafusion::arrow::datatypes::Schema; use datafusion::arrow::ipc::writer::IpcWriteOptions; use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; +use datafusion::execution::context::{SessionConfig, SessionState}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan}; -use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream}; -use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; use futures::StreamExt; use log::error; -use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{fs::File, pin::Pin}; @@ -117,114 +104,6 @@ pub async fn collect_stream( Ok(batches) } -pub struct BallistaQueryPlanner { - scheduler_url: String, - config: BallistaConfig, - extension_codec: Arc, - local_planner: DefaultPhysicalPlanner, - plan_repr: PhantomData, -} - -impl std::fmt::Debug for BallistaQueryPlanner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BallistaQueryPlanner") - .field("scheduler_url", &self.scheduler_url) - .field("config", &self.config) - .field("extension_codec", &self.extension_codec) - .field("plan_repr", &self.plan_repr) - .finish() - } -} - -impl BallistaQueryPlanner { - pub fn new(scheduler_url: String, config: BallistaConfig) -> Self { - Self { - scheduler_url, - config, - extension_codec: Arc::new(BallistaLogicalExtensionCodec::default()), - local_planner: DefaultPhysicalPlanner::default(), - plan_repr: PhantomData, - } - } - - pub fn with_extension( - scheduler_url: String, - config: BallistaConfig, - extension_codec: Arc, - ) -> Self { - Self { - scheduler_url, - config, - extension_codec, - local_planner: DefaultPhysicalPlanner::default(), - plan_repr: PhantomData, - } - } - - pub fn with_repr( - scheduler_url: String, - config: BallistaConfig, - extension_codec: Arc, - plan_repr: PhantomData, - ) -> Self { - Self { - scheduler_url, - config, - extension_codec, - plan_repr, - local_planner: DefaultPhysicalPlanner::default(), - } - } -} - -#[async_trait] -impl QueryPlanner for BallistaQueryPlanner { - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> std::result::Result, DataFusionError> { - log::debug!("create_physical_plan - plan: {:?}", logical_plan); - // we inspect if plan scans local tables only, - // like tables located in information_schema, - // if that is the case, we run that plan - // on this same context, not on cluster - let mut local_run = LocalRun::default(); - let _ = logical_plan.visit(&mut local_run); - - if local_run.can_be_local { - log::debug!("create_physical_plan - local run"); - - self.local_planner - .create_physical_plan(logical_plan, session_state) - .await - } else { - match logical_plan { - LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_t)) => { - log::debug!("create_physical_plan - handling ddl statement"); - Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) - } - LogicalPlan::EmptyRelation(_) => { - log::debug!("create_physical_plan - handling empty exec"); - Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) - } - _ => { - log::debug!("create_physical_plan - handling general statement"); - - Ok(Arc::new(DistributedQueryExec::with_repr( - self.scheduler_url.clone(), - self.config.clone(), - logical_plan.clone(), - self.extension_codec.clone(), - self.plan_repr, - session_state.session_id().to_string(), - ))) - } - } - } - } -} - pub async fn create_grpc_client_connection( dst: D, ) -> std::result::Result @@ -277,128 +156,3 @@ pub fn get_time_before(interval_seconds: u64) -> u64 { .unwrap_or_else(|| Duration::from_secs(0)) .as_secs() } - -/// A Visitor which detect if query is using local tables, -/// such as tables located in `information_schema` and returns true -/// only if all scans are in from local tables -#[derive(Debug, Default)] -struct LocalRun { - can_be_local: bool, -} - -impl<'n> TreeNodeVisitor<'n> for LocalRun { - type Node = LogicalPlan; - - fn f_down( - &mut self, - node: &'n Self::Node, - ) -> datafusion::error::Result { - match node { - LogicalPlan::TableScan(TableScan { table_name, .. }) => match table_name { - datafusion::sql::TableReference::Partial { schema, .. } - | datafusion::sql::TableReference::Full { schema, .. } - if schema.as_ref() == "information_schema" => - { - self.can_be_local = true; - Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue) - } - _ => { - self.can_be_local = false; - Ok(datafusion::common::tree_node::TreeNodeRecursion::Stop) - } - }, - _ => Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue), - } - } -} - -#[cfg(test)] -mod test { - use datafusion::{ - common::tree_node::TreeNode, - error::Result, - execution::{ - runtime_env::{RuntimeConfig, RuntimeEnv}, - SessionStateBuilder, - }, - prelude::{SessionConfig, SessionContext}, - }; - - use crate::utils::LocalRun; - - fn context() -> SessionContext { - let runtime_environment = RuntimeEnv::try_new(RuntimeConfig::new()).unwrap(); - - let session_config = SessionConfig::new().with_information_schema(true); - - let state = SessionStateBuilder::new() - .with_config(session_config) - .with_runtime_env(runtime_environment.into()) - .with_default_features() - .build(); - - SessionContext::new_with_state(state) - } - - #[tokio::test] - async fn should_detect_show_table_as_local_plan() -> Result<()> { - let ctx = context(); - let df = ctx.sql("SHOW TABLES").await?; - let lp = df.logical_plan(); - let mut local_run = LocalRun::default(); - - lp.visit(&mut local_run).unwrap(); - - assert!(local_run.can_be_local); - - Ok(()) - } - - #[tokio::test] - async fn should_detect_select_from_information_schema_as_local_plan() -> Result<()> { - let ctx = context(); - let df = ctx.sql("SELECT * FROM information_schema.df_settings WHERE NAME LIKE 'ballista%'").await?; - let lp = df.logical_plan(); - let mut local_run = LocalRun::default(); - - lp.visit(&mut local_run).unwrap(); - - assert!(local_run.can_be_local); - - Ok(()) - } - - #[tokio::test] - async fn should_not_detect_local_table() -> Result<()> { - let ctx = context(); - ctx.sql("CREATE TABLE tt (c0 INT, c1 INT)") - .await? - .show() - .await?; - let df = ctx.sql("SELECT * FROM tt").await?; - let lp = df.logical_plan(); - let mut local_run = LocalRun::default(); - - lp.visit(&mut local_run).unwrap(); - - assert!(!local_run.can_be_local); - - Ok(()) - } - - #[tokio::test] - async fn should_not_detect_external_table() -> Result<()> { - let ctx = context(); - ctx.register_csv("tt", "tests/customer.csv", Default::default()) - .await?; - let df = ctx.sql("SELECT * FROM tt").await?; - let lp = df.logical_plan(); - let mut local_run = LocalRun::default(); - - lp.visit(&mut local_run).unwrap(); - - assert!(!local_run.can_be_local); - - Ok(()) - } -}