|
19 | 19 |
|
20 | 20 | # Extending DataFusion's operators: custom LogicalPlan and Execution Plans |
21 | 21 |
|
22 | | -Coming soon |
| 22 | +DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. |
| 23 | + |
| 24 | +## About DataFusion µWheel |
| 25 | + |
| 26 | +[DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community. |
| 27 | + |
| 28 | +### Optimizing Logical Plan |
| 29 | + |
| 30 | +The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path. |
| 31 | + |
| 32 | +```rust,ignore |
| 33 | +fn rewrite( |
| 34 | + &self, |
| 35 | + plan: LogicalPlan, |
| 36 | + _config: &dyn OptimizerConfig, |
| 37 | +) -> Result<Transformed<LogicalPlan>> { |
| 38 | + // Attemps to rewrite a logical plan to a uwheel-based plan that either provides |
| 39 | + // plan-time aggregates or skips execution based on min/max pruning. |
| 40 | + if let Some(rewritten) = self.try_rewrite(&plan) { |
| 41 | + Ok(Transformed::yes(rewritten)) |
| 42 | + } else { |
| 43 | + Ok(Transformed::no(plan)) |
| 44 | + } |
| 45 | +} |
| 46 | +``` |
| 47 | + |
| 48 | +```rust,ignore |
| 49 | +// Converts a uwheel aggregate result to a TableScan with a MemTable as source |
| 50 | +fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result<LogicalPlan> { |
| 51 | + let data = Float64Array::from(vec![result]); |
| 52 | + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; |
| 53 | + let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); |
| 54 | + let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?; |
| 55 | + mem_table_as_table_scan(mem_table, df_schema) |
| 56 | +} |
| 57 | +``` |
| 58 | + |
| 59 | +To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. |
0 commit comments