Skip to content

Commit 5b6035c

Browse files
committed
feat: support stateless physical plans
This patch introduces the stateless physical plan feature. Currently, the physical-plan crate is fully supported. This feature allows for the reuse of physical plans and their concurrent execution. The feature is implemented by adding a separate Cargo feature named "stateless_plan". The implementation consists of several parts: * State tree. With the "stateless_plan" feature enabled, the plans themselves do not store state. The state is stored in a separate tree composed of PlanStateNodes, which is built lazily during plan execution. Each node of the tree stores not only the shared state of the plan but also its metrics. The shape of the state tree matches the shape of the execution plan tree. * Metrics Metrics are stored in the nodes of the state tree and can be accessed after plan execution. Support is provided for performing EXPLAIN using the state. * Dynamic Filters In the case of stateless plans, dynamic filters cannot simply be stored inside the plans, as the same plan can be executed concurrently. To overcome this, a dynamic filter is split into two parts: a planning-time version and an execution-time version. The plans contain the planning-time version, which is transformed into the execution version during the execution phase and then passed from parent nodes to child nodes using the state tree. * WorkTable Instead of explicitly injecting the WorkTable into nodes, RecursiveExec exposes the WorkTable in the state stored within the State Tree. Then, a node interested in obtaining the WorkTable traverses up the State Tree and thus retrieves the current WorkTable. Planned following work: - Support stateless plan for all other DataFusion crates. - Enable running tests with this feature in CI. - Deprecate stateful plans to eventually transition completely to the stateless version. - Add `fmt_as_with_state` to allow plans to include state-specific details in the EXPLAIN output, such as dynamic filters. Closes apache#19351
1 parent 1acaf7a commit 5b6035c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3226
-910
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,10 +1303,8 @@ mod test {
13031303
}
13041304

13051305
fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
1306-
Arc::new(DynamicFilterPhysicalExpr::new(
1307-
expr.children().into_iter().map(Arc::clone).collect(),
1308-
expr,
1309-
))
1306+
let children = expr.children().into_iter().map(Arc::clone).collect();
1307+
Arc::new(DynamicFilterPhysicalExpr::new(expr, children))
13101308
}
13111309

13121310
#[tokio::test]

datafusion/execution/src/metrics/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ pub struct Metric {
8989
/// will be shown.
9090
/// - When set to `summary`, only metrics with type `MetricType::Summary` are shown.
9191
///
92-
/// # Difference from `EXPLAIN ANALYZE VERBOSE`:
93-
/// The `VERBOSE` keyword controls whether per-partition metrics are shown (when specified),
94-
/// or aggregated metrics are displayed (when omitted).
92+
/// # Difference from `EXPLAIN ANALYZE VERBOSE`:
93+
/// The `VERBOSE` keyword controls whether per-partition metrics are shown (when specified),
94+
/// or aggregated metrics are displayed (when omitted).
9595
/// In contrast, the `analyze_level` configuration determines which categories or
9696
/// levels of metrics are displayed.
9797
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -213,6 +213,16 @@ impl MetricsSet {
213213
Default::default()
214214
}
215215

216+
/// Return a number of metrics.
217+
pub fn len(&self) -> usize {
218+
self.metrics.len()
219+
}
220+
221+
/// Check if the set is empty.
222+
pub fn is_empty(&self) -> bool {
223+
self.len() == 0
224+
}
225+
216226
/// Add the specified metric
217227
pub fn push(&mut self, metric: Arc<Metric>) {
218228
self.metrics.push(metric)

0 commit comments

Comments
 (0)