Skip to content

Commit 0aa622c

Browse files
authored
GH-45196: [C++][Acero] Small refinement to hash join (#45197)
### Rationale for this change See #45196 ### What changes are included in this PR? Refine/simplify the code mentioned in the issue. ### Are these changes tested? Existing tests suffice. ### Are there any user-facing changes? None. * GitHub Issue: #45196 Authored-by: Rossi Sun <zanmato1984@gmail.com> Signed-off-by: Rossi Sun <zanmato1984@gmail.com>
1 parent 438cf9b commit 0aa622c

File tree

3 files changed

+9
-20
lines changed

3 files changed

+9
-20
lines changed

cpp/src/arrow/acero/hash_join.cc

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -568,20 +568,16 @@ class HashJoinBasicImpl : public HashJoinImpl {
568568
if (has_payload) {
569569
InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
570570
}
571-
hash_table_empty_ = true;
571+
hash_table_empty_ = batches.empty();
572+
RETURN_NOT_OK(dict_build_.Init(*schema_[1], hash_table_empty_ ? nullptr : &batches[0],
573+
ctx_->exec_context()));
572574

573575
for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
574576
if (cancelled_) {
575577
return Status::Cancelled("Hash join cancelled");
576578
}
577579
const ExecBatch& batch = batches[ibatch];
578-
if (batch.length == 0) {
579-
continue;
580-
} else if (hash_table_empty_) {
581-
hash_table_empty_ = false;
582-
583-
RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_->exec_context()));
584-
}
580+
DCHECK_GT(batch.length, 0);
585581
int32_t num_rows_before = hash_table_keys_.num_rows();
586582
RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
587583
&hash_table_keys_, ctx_->exec_context()));
@@ -595,10 +591,6 @@ class HashJoinBasicImpl : public HashJoinImpl {
595591
}
596592
}
597593

598-
if (hash_table_empty_) {
599-
RETURN_NOT_OK(dict_build_.Init(*schema_[1], nullptr, ctx_->exec_context()));
600-
}
601-
602594
return Status::OK();
603595
}
604596

cpp/src/arrow/acero/hash_join_node.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,9 @@ class HashJoinNode : public ExecNode, public TracedNode {
772772
const char* kind_name() const override { return "HashJoinNode"; }
773773

774774
Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
775+
if (batch.length == 0) {
776+
return Status::OK();
777+
}
775778
std::lock_guard<std::mutex> guard(build_side_mutex_);
776779
build_accumulator_.InsertBatch(std::move(batch));
777780
return Status::OK();

cpp/src/arrow/acero/swiss_join.cc

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2598,17 +2598,15 @@ class SwissJoin : public HashJoinImpl {
25982598
return Status::OK();
25992599
}
26002600

2601+
DCHECK_GT(build_side_batches_[batch_id].length, 0);
2602+
26012603
const HashJoinProjectionMaps* schema = schema_[1];
26022604
bool no_payload = hash_table_build_.no_payload();
26032605

26042606
ExecBatch input_batch;
26052607
ARROW_ASSIGN_OR_RAISE(
26062608
input_batch, KeyPayloadFromInput(/*side=*/1, &build_side_batches_[batch_id]));
26072609

2608-
if (input_batch.length == 0) {
2609-
return Status::OK();
2610-
}
2611-
26122610
// Split batch into key batch and optional payload batch
26132611
//
26142612
// Input batch is key-payload batch (key columns followed by payload
@@ -2637,10 +2635,6 @@ class SwissJoin : public HashJoinImpl {
26372635
static_cast<int64_t>(thread_id), key_batch, no_payload ? nullptr : &payload_batch,
26382636
temp_stack)));
26392637

2640-
// Release input batch
2641-
//
2642-
input_batch.values.clear();
2643-
26442638
return Status::OK();
26452639
}
26462640

0 commit comments

Comments
 (0)