Skip to content

Commit c98fa56

Browse files
authored
perfect hash join (#19411)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #17635. ## Rationale for this change This PR introduces a Perfect Hash Join optimization by using an array-based direct mapping(`ArrayMap`) instead of a HashMap. The array-based approach outperforms the standard Hash Join when the build-side keys are **_dense_** (i.e., the ratio of `count / (max - min+1)` is high) or when the key range `(max - min)` is sufficiently **small**. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The following results from the hj.rs benchmark suite. The benchmark was executed with the optimization enabled by setting `DATAFUSION_EXECUTION_PERFECT_HASH_JOIN_MIN_KEY_DENSITY=0.1` ``` ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ base_hj ┃ density=0.1 ┃ Change ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1_density=1_prob_hit=1_25*1.5M │ 5.50 ms │ 4.54 ms │ +1.21x faster │ │ QQuery 2_density=0.026_prob_hit=1_25*1.5M │ 6.13 ms │ 5.43 ms │ +1.13x faster │ │ QQuery 3_density=1_prob_hit=1_100K*60M │ 132.59 ms │ 97.42 ms │ +1.36x faster │ │ QQuery 4_density=1_prob_hit=0.1_100K*60M │ 146.66 ms │ 97.75 ms │ +1.50x faster │ │ QQuery 5_density=0.75_prob_hit=1_100K*60M │ 139.85 ms │ 103.82 ms │ +1.35x faster │ │ QQuery 6_density=0.75_prob_hit=0.1_100K*60M │ 256.62 ms │ 192.15 ms │ +1.34x faster │ │ QQuery 7_density=0.5_prob_hit=1_100K*60M │ 136.27 ms │ 91.64 ms │ +1.49x faster │ │ QQuery 8_density=0.5_prob_hit=0.1_100K*60M │ 234.89 ms │ 185.35 ms │ +1.27x faster │ │ QQuery 9_density=0.2_prob_hit=1_100K*60M │ 132.76 ms │ 98.44 ms │ +1.35x faster │ │ QQuery 10_density=0.2_prob_hit=0.1_100K*60M │ 240.04 ms │ 184.93 ms │ +1.30x faster │ │ QQuery 11_density=0.1_prob_hit=1_100K*60M │ 133.02 ms │ 108.11 ms │ +1.23x faster │ │ QQuery 12_density=0.1_prob_hit=0.1_100K*60M │ 235.44 ms │ 209.10 ms │ +1.13x faster │ │ QQuery 13_density=0.01_prob_hit=1_100K*60M │ 135.64 ms │ 132.52 ms │ no change │ │ QQuery 14_density=0.01_prob_hit=0.1_100K*60M │ 235.88 ms │ 234.62 ms │ no change │ │ QQuery 15_density=0.2_prob_hit=0.1_100K_(20%_dups)*60M │ 178.49 ms │ 147.55 ms │ +1.21x faster │ └────────────────────────────────────────────────────────┴───────────┴─────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (base_hj) │ 2349.79ms │ │ Total Time (density=0.1) │ 1893.37ms │ │ Average Time (base_hj) │ 156.65ms │ │ Average Time (density=0.1) │ 126.22ms │ │ Queries Faster │ 13 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 2 │ │ Queries with Failure │ 0 │ └────────────────────────────┴───────────┘ ``` The following results from tpch-sf10 ``` ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ base ┃ perfect_hj ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 739.66 ms │ 743.84 ms │ no change │ │ QQuery 2 │ 315.94 ms │ 317.53 ms │ no change │ │ QQuery 3 │ 655.79 ms │ 669.24 ms │ no change │ │ QQuery 4 │ 215.48 ms │ 218.79 ms │ no change │ │ QQuery 5 │ 1131.42 ms │ 1146.03 ms │ no change │ │ QQuery 6 │ 202.32 ms │ 190.83 ms │ +1.06x faster │ │ QQuery 7 │ 1734.06 ms │ 1710.50 ms │ no change │ │ QQuery 8 │ 1185.05 ms │ 1173.90 ms │ no change │ │ QQuery 9 │ 2036.76 ms │ 1994.30 ms │ no change │ │ QQuery 10 │ 907.32 ms │ 893.20 ms │ no change │ │ QQuery 11 │ 306.63 ms │ 275.46 ms │ +1.11x faster │ │ QQuery 12 │ 404.00 ms │ 381.95 ms │ +1.06x faster │ │ QQuery 13 │ 531.67 ms │ 498.58 ms │ +1.07x faster │ │ QQuery 14 │ 317.63 ms │ 303.04 ms │ no change │ │ QQuery 15 │ 602.24 ms │ 572.18 ms │ no change │ │ QQuery 16 │ 200.00 ms │ 201.68 ms │ no change │ │ QQuery 17 │ 1848.67 ms │ 1790.60 ms │ no change │ │ QQuery 18 │ 2130.63 ms │ 2179.84 ms │ no change │ │ QQuery 19 │ 501.81 ms │ 529.85 ms │ 1.06x slower │ │ QQuery 20 │ 637.91 ms │ 661.72 ms │ no change │ │ QQuery 21 │ 1882.43 ms │ 1917.10 ms │ no change │ │ QQuery 22 │ 130.68 ms │ 141.76 ms │ 1.08x slower │ └──────────────┴────────────┴────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (base) │ 18618.10ms │ │ Total Time (perfect_hj) │ 18511.93ms │ │ Average Time (base) │ 846.28ms │ │ Average Time (perfect_hj) │ 841.45ms │ │ Queries Faster │ 4 │ │ Queries Slower │ 2 │ │ Queries with No Change │ 16 │ │ Queries with Failure │ 0 │ └───────────────────────────┴────────────┘ ``` ## What changes are included in this PR? - During the `collect_left_input` (build) phase, we now conditionally use an `ArrayMap` instead of a standard `JoinHashMapType`. This optimization is triggered only when the following conditions are met: - There is exactly one join key. - The join key can be any integer type convertible to u64 (excluding i128 and u128). - The key distribution is sufficiently dense or the key range (max - min) is small enough to justify an array-based allocation. - build_side.num_rows() < `u32::MAX` - The `ArrayMap` works by storing the minimum key as an offset and using a Vec to directly map a key `k` to its build-side index via `data[k- offset]`. - Rewrite Hash Join micro-benchmarks in benchmarks/src/hj.rs to evaluate ArrayMap and HashMap performance across varying key densities and probe hit rates <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? Yes, this PR introduces two new session configuration parameters to control the behavior of the Perfect Hash Join optimization: - `perfect_hash_join_small_build_threshold`: This parameter defines the maximum key range (max_key - min_key) for the build side to be considered "small." If the key range is below this threshold, the array-based join will be triggered regardless of key density. - `perfect_hash_join_min_key_density`: This parameter sets the minimum density (row_count / key_range) required to enable the perfect hash join optimization for larger key ranges <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 62658cd commit c98fa56

File tree

18 files changed

+1891
-462
lines changed

18 files changed

+1891
-462
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/bench.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,7 @@ main() {
322322
echo "NLJ benchmark does not require data generation"
323323
;;
324324
hj)
325-
# hj uses range() function, no data generation needed
326-
echo "HJ benchmark does not require data generation"
325+
data_tpch "10" "parquet"
327326
;;
328327
smj)
329328
# smj uses range() function, no data generation needed
@@ -1228,10 +1227,11 @@ run_nlj() {
12281227

12291228
# Runs the hj benchmark
12301229
run_hj() {
1230+
TPCH_DIR="${DATA_DIR}/tpch_sf10"
12311231
RESULTS_FILE="${RESULTS_DIR}/hj.json"
12321232
echo "RESULTS_FILE: ${RESULTS_FILE}"
12331233
echo "Running hj benchmark..."
1234-
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1234+
debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
12351235
}
12361236

12371237
# Runs the smj benchmark

benchmarks/compare.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,17 @@ def compare(
154154
baseline = BenchmarkRun.load_from_file(baseline_path)
155155
comparison = BenchmarkRun.load_from_file(comparison_path)
156156

157-
console = Console()
157+
console = Console(width=200)
158158

159159
# use basename as the column names
160-
baseline_header = baseline_path.parent.stem
161-
comparison_header = comparison_path.parent.stem
160+
baseline_header = baseline_path.parent.name
161+
comparison_header = comparison_path.parent.name
162162

163163
table = Table(show_header=True, header_style="bold magenta")
164-
table.add_column("Query", style="dim", width=12)
165-
table.add_column(baseline_header, justify="right", style="dim")
166-
table.add_column(comparison_header, justify="right", style="dim")
167-
table.add_column("Change", justify="right", style="dim")
164+
table.add_column("Query", style="dim", no_wrap=True)
165+
table.add_column(baseline_header, justify="right", style="dim", no_wrap=True)
166+
table.add_column(comparison_header, justify="right", style="dim", no_wrap=True)
167+
table.add_column("Change", justify="right", style="dim", no_wrap=True)
168168

169169
faster_count = 0
170170
slower_count = 0
@@ -175,12 +175,12 @@ def compare(
175175

176176
for baseline_result, comparison_result in zip(baseline.queries, comparison.queries):
177177
assert baseline_result.query == comparison_result.query
178-
178+
179179
base_failed = not baseline_result.success
180-
comp_failed = not comparison_result.success
180+
comp_failed = not comparison_result.success
181181
# If a query fails, its execution time is excluded from the performance comparison
182182
if base_failed or comp_failed:
183-
change_text = "incomparable"
183+
change_text = "incomparable"
184184
failure_count += 1
185185
table.add_row(
186186
f"Q{baseline_result.query}",

0 commit comments

Comments
 (0)