-
Notifications
You must be signed in to change notification settings - Fork 14
Add arrow flight endpoint hooks #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/flight_service/service.rs
Outdated
| #[allow(clippy::type_complexity)] | ||
| pub(super) struct ArrowFlightEndpointHooks { | ||
| pub(super) on_plan: Arc<dyn Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send>, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO clippy here is a bit over dramatic. Not sure how this is too complex, but not line 59.
geoffreyclaude
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super generic hook, which is great! I think the API should be a bit more DataFusion idiomatic though, and, more critically, allow multiple hooks (to allow various extensions from adding their dedicated hooks.)
Something like:
let mut endpoint = ArrowFlightEndpoint::try_new(DefaultSessionBuilder);
// appends two "pre-get" hooks
endpoint = endpoint.with_pre_get_hook(tracing_hook).with_pre_get_hook(some_other_hook);
// overwrite all "pre-get" hooks
endpoint.with_pre_get_hooks(vec![tracing_hook, some_other_hook]);
});|
Makes sense! followed your suggestion with two details:
|
jayshrivastava
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suggestion but I don't feel strongly. Looks good overall!
| } | ||
| } | ||
| // As many plans as tasks should have been received. | ||
| assert_eq!(plans_received.load(Ordering::SeqCst), task_keys.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but an independent test where you swap out a plan node (ex. DataSourceExec with EmptyExec) would be nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 but that would fail pretty bad, part of the contract with this hook is that people should not do that. Or do you expect that to pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think swapping out a leaf node like DataSourceExec will work because it preserves the plan structure, as long as the schema matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that's not something we are willing to support right? the contract is more something like: "don't touch anything that changes the plan", we just don't enforce it at runtime for performance reasons.
| /// | ||
| /// The callback takes the plan and returns another plan that must be either the same, | ||
| /// or equivalent in terms of execution. Mutating the plan by adding nodes or removing them | ||
| /// will make the query blow up in unexpected ways. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add an assertion that the final schema matches within the ArrowFlightEndpoint so we fail early with "schema mismatch in hook" to make it clear that a hook caused this sort of error.
The harder part is asserting that the plan structure is the same, which is important for metrics. Traversing the plan to assert this would be expensive. Since we only support wrapping (because the plan structure cannot change), can we leverage with_new_children in the hook itself?
ex.
trait Hook {
fn apply(Arc<dyn ExecutionPlan>) -> bool
fn new(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan>
}
// in the endpoint
fn apply_hook<H: Hook>(hook: H, plan: Arc<dyn ExecutionPlan>) {
plan.transform_up(
|node| {
if hook.apply(node) {
Transformed::yes(hook.new(node).with_new_children(node.children()))
}
}
)
}This makes hooks a bit more expensive bc we necessarily traverse the whole plan, where previously we we were not. But for datafusion-tracing, we would have traversed the whole plan anyways.
This might be more effort than it's worth so I'll leave the decision up to you. If you do implement something like this, then I'm happy to take another look :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I imagine in most cases there's just going to be no hook, I think it's worth not penalizing those cases.
The "schema mismatch" might be a good idea though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure that makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now I'm going to keep it simple and not do anything. In case we find it necessary to add some checks we can refer back to this conversation.
Allow people to pass hooks that trigger on certain situations during the lifetime of an Arrow flight
do_getquery. This exposes a new API duringArrowFlightEndpointconstruction that allows people to provide their own callbacks:I was tempted to introduce more hooks in this PR, but I think it'd be better to wait for use-cases rather than trying to guess them now.
The main use case for this right now is for wrapping nodes with https://github.com/datafusion-contrib/datafusion-tracing.