Skip to content

Commit 7ea5066

Browse files
authored
Sort Merge Join: Reduce batch concatenation, use BatchCoalescer, new benchmarks (TPC-H Q21 SMJ up to ~4000x faster) (#18875)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18487. - Will eventually close apache/datafusion-comet#901. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> DataFusion Comet often uses Sort Merge Joins because DataFusion does not have a larger-than-memory Hash Join operator. Performance on TPC-H Q21 is quite bad when run through native, and instead Comet falls back to Spark by default. If you force Comet to use DataFusion's SMJ operator, performance is: <img width="969" height="618" alt="Screenshot 2025-11-21 at 11 31 18 AM" src="https://github.com/user-attachments/assets/bb04d9c4-4f05-4ab8-8960-47a4705d2c83" /> Profiling showed most of the time spent in `concat_batches` of single-digit rows: <img width="1132" height="348" alt="Screenshot 2025-11-20 at 6 49 20 PM" src="https://github.com/user-attachments/assets/89ef70d2-41e5-46cd-82b2-d97b2af1de56" /> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Use a `BatchCoalescer` both internally and to buffer final output. There was also some redundant concatenation of batches for filtered joins. One made the biggest difference, but I switched to two to be consistent. Here are Comet results with the changes based on 50.3 (which is where Comet is): <img width="952" height="599" alt="Screenshot 2025-11-21 at 11 43 57 AM" src="https://github.com/user-attachments/assets/0660addd-acd8-41ed-8b1f-d9efd31e2a17" /> TPC-H SF1 benchmark results are below (`PREFER_HASH_JOIN=false ./bench.sh run tpch`). I tried to run SF10 TPC-H but it seemed like it was going to take hours on my machine. It ran successfully on this PR. ``` ./bench.sh compare_detail main smj Comparing main and smj -------------------- Benchmark tpch_sf1.json -------------------- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ smj ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 44.37 / 48.67 ±4.54 / 55.68 ms │ 41.63 / 55.88 ±19.83 / 95.24 ms │ 1.15x slower │ │ QQuery 2 │ 45.18 / 47.44 ±2.39 / 51.74 ms │ 45.26 / 47.29 ±3.56 / 54.39 ms │ no change │ │ QQuery 3 │ 52.59 / 56.15 ±2.65 / 59.79 ms │ 50.93 / 52.39 ±1.35 / 54.46 ms │ +1.07x faster │ │ QQuery 4 │ 33.06 / 34.46 ±0.97 / 35.88 ms │ 30.06 / 31.04 ±0.74 / 32.14 ms │ +1.11x faster │ │ QQuery 5 │ 84.50 / 87.63 ±2.06 / 90.58 ms │ 78.33 / 80.62 ±2.96 / 86.32 ms │ +1.09x faster │ │ QQuery 6 │ 17.87 / 18.64 ±0.48 / 19.22 ms │ 16.14 / 17.54 ±1.12 / 19.55 ms │ +1.06x faster │ │ QQuery 7 │ 111.11 / 113.59 ±1.79 / 116.70 ms │ 112.43 / 115.85 ±2.55 / 118.96 ms │ no change │ │ QQuery 8 │ 89.84 / 94.59 ±3.34 / 100.15 ms │ 92.26 / 94.64 ±2.28 / 97.50 ms │ no change │ │ QQuery 9 │ 128.36 / 133.12 ±3.46 / 138.00 ms │ 124.58 / 130.47 ±6.30 / 138.85 ms │ no change │ │ QQuery 10 │ 49.89 / 51.91 ±1.41 / 54.19 ms │ 48.55 / 50.43 ±1.82 / 52.92 ms │ no change │ │ QQuery 11 │ 34.19 / 35.30 ±0.59 / 35.84 ms │ 32.42 / 34.59 ±1.52 / 36.47 ms │ no change │ │ QQuery 12 │ 36.26 / 38.67 ±2.44 / 42.77 ms │ 32.92 / 34.28 ±1.18 / 36.38 ms │ +1.13x faster │ │ QQuery 13 │ 31.32 / 34.13 ±2.29 / 38.22 ms │ 28.66 / 29.84 ±1.11 / 31.94 ms │ +1.14x faster │ │ QQuery 14 │ 23.54 / 24.79 ±0.92 / 26.00 ms │ 22.48 / 23.45 ±1.03 / 25.44 ms │ +1.06x faster │ │ QQuery 15 │ 26.66 / 27.47 ±0.86 / 29.05 ms │ 26.23 / 28.64 ±1.72 / 31.48 ms │ no change │ │ QQuery 16 │ 17.63 / 18.94 ±0.97 / 20.20 ms │ 16.82 / 18.11 ±1.33 / 20.60 ms │ no change │ │ QQuery 17 │ 94.36 / 96.41 ±1.62 / 98.44 ms │ 91.47 / 93.47 ±1.70 / 96.54 ms │ no change │ │ QQuery 18 │ 99.91 / 108.58 ±5.85 / 117.27 ms │ 104.25 / 106.40 ±2.42 / 110.47 ms │ no change │ │ QQuery 19 │ 35.23 / 36.68 ±1.46 / 39.23 ms │ 32.98 / 36.03 ±1.88 / 38.57 ms │ no change │ │ QQuery 20 │ 40.66 / 41.84 ±1.20 / 44.05 ms │ 38.12 / 39.20 ±0.92 / 40.45 ms │ +1.07x faster │ │ QQuery 21 │ 151142.04 / 246274.24 ±89682.07 / 358766.84 ms │ 216.09 / 218.73 ±2.03 / 221.31 ms │ +1125.94x faster │ │ QQuery 22 │ 16.69 / 28.53 ±22.72 / 73.97 ms │ 16.72 / 17.39 ±0.78 / 18.86 ms │ +1.64x faster │ └──────────────┴────────────────────────────────────────────────┴───────────────────────────────────┴──────────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │ Total Time (main) │ 247451.79ms │ │ Total Time (smj) │ 1356.29ms │ │ Average Time (main) │ 11247.81ms │ │ Average Time (smj) │ 61.65ms │ │ Queries Faster │ 10 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 11 │ │ Queries with Failure │ 0 │ └────────────────────────┴─────────────┘ ``` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing Sort Merge Join unit tests, added a new benchmark. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> There should not be. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent ea9f354 commit 7ea5066

File tree

7 files changed

+1033
-253
lines changed

7 files changed

+1033
-253
lines changed

benchmarks/README.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ Different queries are included to test nested loop joins under various workloads
790790

791791
## Hash Join
792792

793-
This benchmark focuses on the performance of queries with nested hash joins, minimizing other overheads such as scanning data sources or evaluating predicates.
793+
This benchmark focuses on the performance of queries with hash joins, minimizing other overheads such as scanning data sources or evaluating predicates.
794794

795795
Several queries are included to test hash joins under various workloads.
796796

@@ -802,6 +802,19 @@ Several queries are included to test hash joins under various workloads.
802802
./bench.sh run hj
803803
```
804804

805+
## Sort Merge Join
806+
807+
This benchmark focuses on the performance of queries with sort merge joins joins, minimizing other overheads such as scanning data sources or evaluating predicates.
808+
809+
Several queries are included to test sort merge joins under various workloads.
810+
811+
### Example Run
812+
813+
```bash
814+
# No need to generate data: this benchmark uses table function `range()` as the data source
815+
816+
./bench.sh run smj
817+
```
805818
## Cancellation
806819

807820
Test performance of cancelling queries.

benchmarks/bench.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver
132132
cancellation: How long cancelling a query takes
133133
nlj: Benchmark for simple nested loop joins, testing various join scenarios
134134
hj: Benchmark for simple hash joins, testing various join scenarios
135+
smj: Benchmark for simple sort merge joins, testing various join scenarios
135136
compile_profile: Compile and execute TPC-H across selected Cargo profiles, reporting timing and binary size
136137
137138
@@ -324,6 +325,10 @@ main() {
324325
# hj uses range() function, no data generation needed
325326
echo "HJ benchmark does not require data generation"
326327
;;
328+
smj)
329+
# smj uses range() function, no data generation needed
330+
echo "SMJ benchmark does not require data generation"
331+
;;
327332
compile_profile)
328333
data_tpch "1" "parquet"
329334
;;
@@ -401,6 +406,7 @@ main() {
401406
run_nlj
402407
run_hj
403408
run_tpcds
409+
run_smj
404410
;;
405411
tpch)
406412
run_tpch "1" "parquet"
@@ -514,6 +520,9 @@ main() {
514520
hj)
515521
run_hj
516522
;;
523+
smj)
524+
run_smj
525+
;;
517526
compile_profile)
518527
run_compile_profile "${PROFILE_ARGS[@]}"
519528
;;
@@ -1234,6 +1243,14 @@ run_hj() {
12341243
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
12351244
}
12361245

1246+
# Runs the smj benchmark
1247+
run_smj() {
1248+
RESULTS_FILE="${RESULTS_DIR}/smj.json"
1249+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1250+
echo "Running smj benchmark..."
1251+
debug_run $CARGO_COMMAND --bin dfbench -- smj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1252+
}
1253+
12371254

12381255
compare_benchmarks() {
12391256
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"

benchmarks/src/bin/dfbench.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

3636
use datafusion_benchmarks::{
37-
cancellation, clickbench, h2o, hj, imdb, nlj, sort_tpch, tpcds, tpch,
37+
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
3838
};
3939

4040
#[derive(Debug, StructOpt)]
@@ -46,6 +46,7 @@ enum Options {
4646
HJ(hj::RunOpt),
4747
Imdb(imdb::RunOpt),
4848
Nlj(nlj::RunOpt),
49+
Smj(smj::RunOpt),
4950
SortTpch(sort_tpch::RunOpt),
5051
Tpch(tpch::RunOpt),
5152
Tpcds(tpcds::RunOpt),
@@ -63,6 +64,7 @@ pub async fn main() -> Result<()> {
6364
Options::HJ(opt) => opt.run().await,
6465
Options::Imdb(opt) => Box::pin(opt.run()).await,
6566
Options::Nlj(opt) => opt.run().await,
67+
Options::Smj(opt) => opt.run().await,
6668
Options::SortTpch(opt) => opt.run().await,
6769
Options::Tpch(opt) => Box::pin(opt.run()).await,
6870
Options::Tpcds(opt) => Box::pin(opt.run()).await,

benchmarks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod h2o;
2222
pub mod hj;
2323
pub mod imdb;
2424
pub mod nlj;
25+
pub mod smj;
2526
pub mod sort_tpch;
2627
pub mod tpcds;
2728
pub mod tpch;

0 commit comments

Comments
 (0)