-
Notifications
You must be signed in to change notification settings - Fork 14
Add arrow flight endpoint hooks #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,15 +10,24 @@ use arrow_flight::{ | |
| use async_trait::async_trait; | ||
| use datafusion::error::DataFusionError; | ||
| use datafusion::execution::runtime_env::RuntimeEnv; | ||
| use datafusion::physical_plan::ExecutionPlan; | ||
| use futures::stream::BoxStream; | ||
| use std::sync::Arc; | ||
| use tokio::sync::OnceCell; | ||
| use tonic::{Request, Response, Status, Streaming}; | ||
|
|
||
| #[allow(clippy::type_complexity)] | ||
| #[derive(Default)] | ||
| pub(super) struct ArrowFlightEndpointHooks { | ||
| pub(super) on_plan: | ||
| Vec<Arc<dyn Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send>>, | ||
| } | ||
|
|
||
| pub struct ArrowFlightEndpoint { | ||
| pub(super) runtime: Arc<RuntimeEnv>, | ||
| pub(super) task_data_entries: Arc<TTLMap<StageKey, Arc<OnceCell<TaskData>>>>, | ||
| pub(super) session_builder: Arc<dyn DistributedSessionBuilder + Send + Sync>, | ||
| pub(super) hooks: ArrowFlightEndpointHooks, | ||
| } | ||
|
|
||
| impl ArrowFlightEndpoint { | ||
|
|
@@ -30,8 +39,21 @@ impl ArrowFlightEndpoint { | |
| runtime: Arc::new(RuntimeEnv::default()), | ||
| task_data_entries: Arc::new(ttl_map), | ||
| session_builder: Arc::new(session_builder), | ||
| hooks: ArrowFlightEndpointHooks::default(), | ||
| }) | ||
| } | ||
|
|
||
| /// Adds a callback for when an [ExecutionPlan] is received in the `do_get` call. | ||
| /// | ||
| /// The callback takes the plan and returns another plan that must be either the same, | ||
| /// or equivalent in terms of execution. Mutating the plan by adding nodes or removing them | ||
| /// will make the query blow up in unexpected ways. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can add an assertion that the final schema matches within the The harder part is asserting that the plan structure is the same, which is important for metrics. Traversing the plan to assert this would be expensive. Since we only support wrapping (because the plan structure cannot change), can we leverage ex. trait Hook {
fn apply(Arc<dyn ExecutionPlan>) -> bool
fn new(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan>
}
// in the endpoint
fn apply_hook<H: Hook>(hook: H, plan: Arc<dyn ExecutionPlan>) {
plan.transform_up(
|node| {
if hook.apply(node) {
Transformed::yes(hook.new(node).with_new_children(node.children()))
}
}
)
}This makes hooks a bit more expensive bc we necessarily traverse the whole plan, where previously we we were not. But for This might be more effort than it's worth so I'll leave the decision up to you. If you do implement something like this, then I'm happy to take another look :)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 I imagine in most cases there's just going to be no hook, I think it's worth not penalizing those cases. The "schema mismatch" might be a good idea though
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure that makes sense
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for now I'm going to keep it simple and not do anything. In case we find it necessary to add some checks we can refer back to this conversation. |
||
| pub fn add_on_plan_hook( | ||
| &mut self, | ||
| hook: impl Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send + 'static, | ||
| ) { | ||
| self.hooks.on_plan.push(Arc::new(hook)); | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but an independent test where you swap out a plan node (ex.
DataSourceExecwithEmptyExec) would be niceThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 but that would fail pretty bad, part of the contract with this hook is that people should not do that. Or do you expect that to pass?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think swapping out a leaf node like
DataSourceExecwill work because it preserves the plan structure, as long as the schema matches.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that's not something we are willing to support right? the contract is more something like: "don't touch anything that changes the plan", we just don't enforce it at runtime for performance reasons.