|
| 1 | +# Custom Execution Plan Example |
| 2 | + |
| 3 | +Demonstrates how to create a distributed custom execution plan with a `numbers(start, end)` table function. |
| 4 | + |
| 5 | +## Components |
| 6 | + |
| 7 | +**NumbersTableFunction** – Table function callable in SQL: `SELECT * FROM numbers(1, 100)` |
| 8 | + |
| 9 | +**NumbersExec** – Execution plan with `ranges_per_task: Vec<Range<i64>>` storing one range per task. |
| 10 | +Uses `DistributedTaskContext` to determine which range to generate. |
| 11 | + |
| 12 | +**NumbersExecCodec** – Protobuf-based serialization implementing `PhysicalExtensionCodec`. |
| 13 | +Must be registered on both coordinator and workers. |
| 14 | + |
| 15 | +**NumbersTaskEstimator** – Controls distributed parallelism: |
| 16 | + |
| 17 | +- `task_estimation()` - Returns how many tasks needed based on range size and config |
| 18 | +- `scale_up_leaf_node()` - Splits single range of numbers into N per-task ranges |
| 19 | + |
| 20 | +**NumbersConfig** – Custom config extension for controlling distributed parallelism (`numbers_per_task: usize`) |
| 21 | + |
| 22 | +## Usage |
| 23 | + |
| 24 | +This example imports the `InMemoryWorkerResolver` and the `InMemoryChannelResolver` used during integration testing |
| 25 | +of this project, so it needs the `--features integration` flag on. |
| 26 | + |
| 27 | +This example demonstrates how the bigger the range of numbers is queried, the more tasks are used in executing |
| 28 | +the query, for example: |
| 29 | + |
| 30 | +```bash |
| 31 | +cargo run \ |
| 32 | + --features integration \ |
| 33 | + --example custom_execution_plan \ |
| 34 | + "SELECT DISTINCT number FROM numbers(0, 10) ORDER BY number" \ |
| 35 | + --show-distributed-plan |
| 36 | +``` |
| 37 | + |
| 38 | +This will print a non-distributed plan, as the range of numbers we are querying (`numbers(0, 10)`) is small. |
| 39 | + |
| 40 | +The config parameter `numbers.numbers_per_task` is the one that controls how many distributed tasks are used in the |
| 41 | +query, and it's default value is `10`, so querying 10 numbers will not distribute the plan. |
| 42 | + |
| 43 | +However, if we try to query 11 numbers: |
| 44 | + |
| 45 | +```bash |
| 46 | +cargo run \ |
| 47 | + --features integration \ |
| 48 | + --example custom_execution_plan \ |
| 49 | + "SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" \ |
| 50 | + --show-distributed-plan |
| 51 | +``` |
| 52 | + |
| 53 | +The distribution rule kicks in, and the plan gets distributed. |
| 54 | + |
| 55 | +Note that the parallelism in the plan has an upper threshold, so for example, if we query 100 numbers: |
| 56 | + |
| 57 | +```bash |
| 58 | +cargo run \ |
| 59 | + --features integration \ |
| 60 | + --example custom_execution_plan \ |
| 61 | + "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \ |
| 62 | + --show-distributed-plan |
| 63 | +``` |
| 64 | + |
| 65 | +We do not get 100/10 = 10 distributed tasks, we just get 4. This is because the example is configured by default to |
| 66 | +simulate a 4-worker cluster. If we increase the worker count, we get a highly distributed plan out with 10 tasks: |
| 67 | + |
| 68 | +```bash |
| 69 | +cargo run \ |
| 70 | + --features integration \ |
| 71 | + --example custom_execution_plan \ |
| 72 | + "SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \ |
| 73 | + --workers 10 \ |
| 74 | + --show-distributed-plan |
| 75 | +``` |
| 76 | + |
0 commit comments