Skip to content

Commit c657d19

Browse files
committed
Store multiple hooks
1 parent 60e3208 commit c657d19

File tree

2 files changed

+11
-15
lines changed

2 files changed

+11
-15
lines changed

src/flight_service/do_get.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,10 @@ impl ArrowFlightEndpoint {
9494
let stage_data = once
9595
.get_or_try_init(|| async {
9696
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
97-
let plan = proto_node.try_into_physical_plan(&ctx, &self.runtime, &codec)?;
98-
let plan = (self.hooks.on_plan)(plan);
97+
let mut plan = proto_node.try_into_physical_plan(&ctx, &self.runtime, &codec)?;
98+
for hook in self.hooks.on_plan.iter() {
99+
plan = hook(plan)
100+
}
99101

100102
// Initialize partition count to the number of partitions in the stage
101103
let total_partitions = plan.properties().partitioning.partition_count();
@@ -232,7 +234,7 @@ mod tests {
232234
let plans_received = Arc::new(AtomicUsize::default());
233235
{
234236
let plans_received = Arc::clone(&plans_received);
235-
endpoint.on_plan(move |plan| {
237+
endpoint.add_on_plan_hook(move |plan| {
236238
plans_received.fetch_add(1, Ordering::SeqCst);
237239
plan
238240
});

src/flight_service/service.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,10 @@ use tokio::sync::OnceCell;
1717
use tonic::{Request, Response, Status, Streaming};
1818

1919
#[allow(clippy::type_complexity)]
20+
#[derive(Default)]
2021
pub(super) struct ArrowFlightEndpointHooks {
21-
pub(super) on_plan: Arc<dyn Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send>,
22-
}
23-
24-
impl Default for ArrowFlightEndpointHooks {
25-
fn default() -> Self {
26-
Self {
27-
on_plan: Arc::new(|plan| plan),
28-
}
29-
}
22+
pub(super) on_plan:
23+
Vec<Arc<dyn Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send>>,
3024
}
3125

3226
pub struct ArrowFlightEndpoint {
@@ -54,11 +48,11 @@ impl ArrowFlightEndpoint {
5448
/// The callback takes the plan and returns another plan that must be either the same,
5549
/// or equivalent in terms of execution. Mutating the plan by adding nodes or removing them
5650
/// will make the query blow up in unexpected ways.
57-
pub fn on_plan(
51+
pub fn add_on_plan_hook(
5852
&mut self,
59-
cbk: impl Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send + 'static,
53+
hook: impl Fn(Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> + Sync + Send + 'static,
6054
) {
61-
self.hooks.on_plan = Arc::new(cbk);
55+
self.hooks.on_plan.push(Arc::new(hook));
6256
}
6357
}
6458

0 commit comments

Comments
 (0)