Skip to content

Commit 373b0ad

Browse files
fix: remove WorkTableExec special case in reset_plan_states (#18803)
## Which issue does this PR close? This PR fixes an oversight in #16469 that broke wrapper nodes in recursive queries. When `with_new_state()` was added as a generic state injection mechanism, `reset_plan_states()` kept a concrete type check for `WorkTableExec`. This means wrapper nodes that delegate `as_any()` to their inner node won't be recognized, causing them to keep stale state across iterations. This breaks recursive queries for external crates that wrap execution plans (like tracing or monitoring tools). ## Rationale for this change The `reset_plan_states()` function uses `plan.as_any().is::<WorkTableExec>()` to decide which nodes to skip resetting. This works fine for bare `WorkTableExec` but fails when it's wrapped by custom nodes as the type check can't see through the wrapper. This defeats the whole point of `with_new_state()`, which was designed to let wrapper and third-party nodes participate in recursive queries without concrete type checks. The special case was added to save one `Arc::clone()` per iteration, but it's not worth it since `WorkTableExec::with_new_children()` already just returns `Arc::clone(&self)` anyway. The shared `WorkTable` state is preserved through the `Arc<WorkTable>` reference, so the optimization doesn't buy us much while breaking wrapper nodes. ## What changes are included in this PR? - Remove the `WorkTableExec` type check in `reset_plan_states()` - just reset all nodes uniformly via `reset_state()`. - Simplify the function from 9 lines to 4 lines. - The shared `WorkTable` state stays correct because `WorkTableExec::with_new_children()` returns `Arc::clone(&self)`. ## Are these changes tested? Yes, covered by existing tests: - All recursive query tests pass, so backward compatibility is maintained. - The behavior for bare `WorkTableExec` is identical. - This fixes wrapper nodes that were broken before. Since this restores functionality for wrapper nodes rather than changing core DataFusion behavior, it fixes test failures in external crates (like `datafusion-tracing`) without needing new tests here. ## Are there any user-facing changes? **No Breaking Changes:** - Everything that worked before still works. - This is purely a bug fix. **Benefits:** - External crates can now wrap `WorkTableExec` and implement `with_new_state()` without breaking recursive queries. - Tracing, monitoring, and instrumentation wrappers now work correctly with recursive queries. - Restores the extensibility that #16469 was designed to provide.
1 parent 2dd17b9 commit 373b0ad

File tree

1 file changed

+3
-8
lines changed

1 file changed

+3
-8
lines changed

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::any::Any;
2121
use std::sync::Arc;
2222
use std::task::{Context, Poll};
2323

24-
use super::work_table::{ReservedBatches, WorkTable, WorkTableExec};
24+
use super::work_table::{ReservedBatches, WorkTable};
2525
use crate::execution_plan::{Boundedness, EmissionType};
2626
use crate::{
2727
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
@@ -378,13 +378,8 @@ fn assign_work_table(
378378
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
379379
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
380380
plan.transform_up(|plan| {
381-
// WorkTableExec's states have already been updated correctly.
382-
if plan.as_any().is::<WorkTableExec>() {
383-
Ok(Transformed::no(plan))
384-
} else {
385-
let new_plan = Arc::clone(&plan).reset_state()?;
386-
Ok(Transformed::yes(new_plan))
387-
}
381+
let new_plan = Arc::clone(&plan).reset_state()?;
382+
Ok(Transformed::yes(new_plan))
388383
})
389384
.data()
390385
}

0 commit comments

Comments
 (0)