Skip to content

Commit f5a4f12

Browse files
rtpswraulcd
authored andcommitted
GH-36482: [C++][CI] Fix sporadic test failures in AsofJoinBasicTest (#36499)
### What changes are included in this PR? The key hasher is invalidated before the first invocation of `GetKey` (via `GetLatestKey`) after a new batch arrives. In the pre-PR code, this invalidation happens within `Advance`, which is called from `AdvanceAndMemoize` only after `GetLatestKey` is called. The change adds synchronization between the input-receiving- and processing- threads, because avoiding that would require a more complicated and brittle change, e.g., one that involves detecting in the processing thread when a new batch was added to the queue in order to invalidate the key hasher at that time. ### Are these changes tested? Yes, by existing tests. ### Are there any user-facing changes? No. **This PR contains a "Critical Fix".** * Closes: #36482 Authored-by: Yaron Gvili <[email protected]> Signed-off-by: Weston Pace <[email protected]>
1 parent a7a6034 commit f5a4f12

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ class KeyHasher {
524524
size_t index_;
525525
std::vector<col_index_t> indices_;
526526
std::vector<KeyColumnMetadata> metadata_;
527-
const RecordBatch* batch_;
527+
std::atomic<const RecordBatch*> batch_;
528528
std::vector<HashType> hashes_;
529529
LightContext ctx_;
530530
std::vector<KeyColumnArray> column_arrays_;
@@ -819,7 +819,6 @@ class InputState {
819819
have_active_batch &= !queue_.TryPop();
820820
if (have_active_batch) {
821821
DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed
822-
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
823822
memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed
824823
}
825824
}
@@ -897,7 +896,8 @@ class InputState {
897896

898897
Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
899898
if (rb->num_rows() > 0) {
900-
queue_.Push(rb); // only after above updates - push batch for processing
899+
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
900+
queue_.Push(rb); // only now push batch for processing
901901
} else {
902902
++batches_processed_; // don't enqueue empty batches, just record as processed
903903
}

0 commit comments

Comments
 (0)