Skip to content

Commit a2453bd

Browse files
apacheGH-41190: [C++] support for single threaded joins (apache#41125)
When I initially added single threading support, I didn't do asof joins and sorted merge joins, because the code for these operations uses threads internally. This is a small check-in to add support for them. Tests run okay in single-threaded, I'm pushing it here to run full tests and check I didn't break the threaded case. I'm pushing this now because making this work saves adding a load of threading checks in python (this currently breaks single-threaded python i.e. emscripten). * GitHub Issue: apache#41190 Lead-authored-by: Joe Marshall <[email protected]> Co-authored-by: Rossi Sun <[email protected]> Signed-off-by: Weston Pace <[email protected]>
1 parent 774ee0f commit a2453bd

File tree

3 files changed

+125
-33
lines changed

3 files changed

+125
-33
lines changed

cpp/src/arrow/acero/CMakeLists.txt

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,8 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
173173
bloom_filter_test.cc)
174174
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc)
175175

176-
# asof_join_node and sorted_merge_node use std::thread internally
177-
# and doesn't use ThreadPool so it will
178-
# be broken if threading is turned off
179-
if(ARROW_ENABLE_THREADING)
180-
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc)
181-
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc)
182-
endif()
176+
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc)
177+
add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc)
183178

184179
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
185180
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
@@ -228,9 +223,7 @@ if(ARROW_BUILD_BENCHMARKS)
228223
add_arrow_acero_benchmark(project_benchmark SOURCES benchmark_util.cc
229224
project_benchmark.cc)
230225

231-
if(ARROW_ENABLE_THREADING)
232-
add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
233-
endif()
226+
add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
234227

235228
add_arrow_acero_benchmark(tpch_benchmark SOURCES tpch_benchmark.cc)
236229

@@ -253,9 +246,7 @@ if(ARROW_BUILD_BENCHMARKS)
253246
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static)
254247
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_static)
255248
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static)
256-
if(ARROW_ENABLE_THREADING)
257-
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
258-
endif()
249+
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
259250
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
260251
if(ARROW_BUILD_OPENMP_BENCHMARKS)
261252
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
@@ -264,9 +255,7 @@ if(ARROW_BUILD_BENCHMARKS)
264255
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared)
265256
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared)
266257
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared)
267-
if(ARROW_ENABLE_THREADING)
268-
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
269-
endif()
258+
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
270259
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
271260
if(ARROW_BUILD_OPENMP_BENCHMARKS)
272261
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,8 @@ class AsofJoinNode : public ExecNode {
10141014
}
10151015
}
10161016

1017+
#ifdef ARROW_ENABLE_THREADING
1018+
10171019
template <typename Callable>
10181020
struct Defer {
10191021
Callable callable;
@@ -1100,6 +1102,7 @@ class AsofJoinNode : public ExecNode {
11001102
}
11011103

11021104
static void ProcessThreadWrapper(AsofJoinNode* node) { node->ProcessThread(); }
1105+
#endif
11031106

11041107
public:
11051108
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
@@ -1131,8 +1134,10 @@ class AsofJoinNode : public ExecNode {
11311134
}
11321135

11331136
virtual ~AsofJoinNode() {
1134-
process_.Push(false); // poison pill
1137+
#ifdef ARROW_ENABLE_THREADING
1138+
PushProcess(false);
11351139
process_thread_.join();
1140+
#endif
11361141
}
11371142

11381143
const std::vector<col_index_t>& indices_of_on_key() { return indices_of_on_key_; }
@@ -1410,7 +1415,8 @@ class AsofJoinNode : public ExecNode {
14101415
rb->ToString(), DEBUG_MANIP(std::endl));
14111416

14121417
ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
1413-
process_.Push(true);
1418+
PushProcess(true);
1419+
14141420
return Status::OK();
14151421
}
14161422

@@ -1425,31 +1431,88 @@ class AsofJoinNode : public ExecNode {
14251431
// The reason for this is that there are cases at the end of a table where we don't
14261432
// know whether the RHS of the join is up-to-date until we know that the table is
14271433
// finished.
1428-
process_.Push(true);
1434+
PushProcess(true);
1435+
14291436
return Status::OK();
14301437
}
1438+
void PushProcess(bool value) {
1439+
#ifdef ARROW_ENABLE_THREADING
1440+
process_.Push(value);
1441+
#else
1442+
if (value) {
1443+
ProcessNonThreaded();
1444+
} else if (!process_task_.is_finished()) {
1445+
EndFromSingleThread();
1446+
}
1447+
#endif
1448+
}
14311449

1432-
Status StartProducing() override {
14331450
#ifndef ARROW_ENABLE_THREADING
1434-
return Status::NotImplemented("ASOF join requires threading enabled");
1451+
bool ProcessNonThreaded() {
1452+
while (!process_task_.is_finished()) {
1453+
Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
1454+
1455+
if (result.ok()) {
1456+
auto out_rb = *result;
1457+
if (!out_rb) break;
1458+
ExecBatch out_b(*out_rb);
1459+
out_b.index = batches_produced_++;
1460+
DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
1461+
out_rb->ToString(), DEBUG_MANIP(std::endl));
1462+
Status st = output_->InputReceived(this, std::move(out_b));
1463+
if (!st.ok()) {
1464+
// this isn't really from a thread,
1465+
// but we call through to this for consistency
1466+
EndFromSingleThread(std::move(st));
1467+
return false;
1468+
}
1469+
} else {
1470+
// this isn't really from a thread,
1471+
// but we call through to this for consistency
1472+
EndFromSingleThread(result.status());
1473+
return false;
1474+
}
1475+
}
1476+
auto& lhs = *state_.at(0);
1477+
if (lhs.Finished() && !process_task_.is_finished()) {
1478+
EndFromSingleThread(Status::OK());
1479+
}
1480+
return true;
1481+
}
1482+
1483+
void EndFromSingleThread(Status st = Status::OK()) {
1484+
process_task_.MarkFinished(st);
1485+
if (st.ok()) {
1486+
st = output_->InputFinished(this, batches_produced_);
1487+
}
1488+
for (const auto& s : state_) {
1489+
st &= s->ForceShutdown();
1490+
}
1491+
}
1492+
14351493
#endif
14361494

1495+
Status StartProducing() override {
14371496
ARROW_ASSIGN_OR_RAISE(process_task_, plan_->query_context()->BeginExternalTask(
14381497
"AsofJoinNode::ProcessThread"));
14391498
if (!process_task_.is_valid()) {
14401499
// Plan has already aborted. Do not start process thread
14411500
return Status::OK();
14421501
}
1502+
#ifdef ARROW_ENABLE_THREADING
14431503
process_thread_ = std::thread(&AsofJoinNode::ProcessThreadWrapper, this);
1504+
#endif
14441505
return Status::OK();
14451506
}
14461507

14471508
void PauseProducing(ExecNode* output, int32_t counter) override {}
14481509
void ResumeProducing(ExecNode* output, int32_t counter) override {}
14491510

14501511
Status StopProducingImpl() override {
1512+
#ifdef ARROW_ENABLE_THREADING
14511513
process_.Clear();
1452-
process_.Push(false);
1514+
#endif
1515+
PushProcess(false);
14531516
return Status::OK();
14541517
}
14551518

@@ -1479,11 +1542,13 @@ class AsofJoinNode : public ExecNode {
14791542

14801543
// Backpressure counter common to all inputs
14811544
std::atomic<int32_t> backpressure_counter_;
1545+
#ifdef ARROW_ENABLE_THREADING
14821546
// Queue for triggering processing of a given input
14831547
// (a false value is a poison pill)
14841548
ConcurrentQueue<bool> process_;
14851549
// Worker thread
14861550
std::thread process_thread_;
1551+
#endif
14871552
Future<> process_task_;
14881553

14891554
// In-progress batches produced
@@ -1511,9 +1576,13 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector inputs,
15111576
debug_os_(join_options.debug_opts ? join_options.debug_opts->os : nullptr),
15121577
debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex : nullptr),
15131578
#endif
1514-
backpressure_counter_(1),
1579+
backpressure_counter_(1)
1580+
#ifdef ARROW_ENABLE_THREADING
1581+
,
15151582
process_(),
1516-
process_thread_() {
1583+
process_thread_()
1584+
#endif
1585+
{
15171586
for (auto& key_hasher : key_hashers_) {
15181587
key_hasher->node_ = this;
15191588
}

cpp/src/arrow/acero/sorted_merge_node.cc

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -262,19 +262,22 @@ class SortedMergeNode : public ExecNode {
262262
: ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
263263
ordering_(std::move(new_ordering)),
264264
input_counter(inputs_.size()),
265-
output_counter(inputs_.size()),
266-
process_thread() {
265+
output_counter(inputs_.size())
266+
#ifdef ARROW_ENABLE_THREADING
267+
,
268+
process_thread()
269+
#endif
270+
{
267271
SetLabel("sorted_merge");
268272
}
269273

270274
~SortedMergeNode() override {
271-
process_queue.Push(
272-
kPoisonPill); // poison pill
273-
// We might create a temporary (such as to inspect the output
274-
// schema), in which case there isn't anything to join
275+
PushTask(kPoisonPill);
276+
#ifdef ARROW_ENABLE_THREADING
275277
if (process_thread.joinable()) {
276278
process_thread.join();
277279
}
280+
#endif
278281
}
279282

280283
static arrow::Result<arrow::acero::ExecNode*> Make(
@@ -355,10 +358,25 @@ class SortedMergeNode : public ExecNode {
355358
// InputState's ConcurrentQueue manages locking
356359
input_counter[index] += rb->num_rows();
357360
ARROW_RETURN_NOT_OK(state[index]->Push(rb));
358-
process_queue.Push(kNewTask);
361+
PushTask(kNewTask);
359362
return Status::OK();
360363
}
361364

365+
void PushTask(bool ok) {
366+
#ifdef ARROW_ENABLE_THREADING
367+
process_queue.Push(ok);
368+
#else
369+
if (process_task.is_finished()) {
370+
return;
371+
}
372+
if (ok == kNewTask) {
373+
PollOnce();
374+
} else {
375+
EndFromProcessThread();
376+
}
377+
#endif
378+
}
379+
362380
arrow::Status InputFinished(arrow::acero::ExecNode* input, int total_batches) override {
363381
ARROW_DCHECK(std_has(inputs_, input));
364382
{
@@ -368,7 +386,8 @@ class SortedMergeNode : public ExecNode {
368386
state.at(k)->set_total_batches(total_batches);
369387
}
370388
// Trigger a final process call for stragglers
371-
process_queue.Push(kNewTask);
389+
PushTask(kNewTask);
390+
372391
return Status::OK();
373392
}
374393

@@ -379,13 +398,17 @@ class SortedMergeNode : public ExecNode {
379398
// Plan has already aborted. Do not start process thread
380399
return Status::OK();
381400
}
401+
#ifdef ARROW_ENABLE_THREADING
382402
process_thread = std::thread(&SortedMergeNode::StartPoller, this);
403+
#endif
383404
return Status::OK();
384405
}
385406

386407
arrow::Status StopProducingImpl() override {
408+
#ifdef ARROW_ENABLE_THREADING
387409
process_queue.Clear();
388-
process_queue.Push(kPoisonPill);
410+
#endif
411+
PushTask(kPoisonPill);
389412
return Status::OK();
390413
}
391414

@@ -408,13 +431,20 @@ class SortedMergeNode : public ExecNode {
408431
<< input_counter[i] << " != " << output_counter[i];
409432
}
410433

434+
#ifdef ARROW_ENABLE_THREADING
411435
ARROW_UNUSED(
412436
plan_->query_context()->executor()->Spawn([this, st = std::move(st)]() mutable {
413437
Defer cleanup([this, &st]() { process_task.MarkFinished(st); });
414438
if (st.ok()) {
415439
st = output_->InputFinished(this, batches_produced);
416440
}
417441
}));
442+
#else
443+
process_task.MarkFinished(st);
444+
if (st.ok()) {
445+
st = output_->InputFinished(this, batches_produced);
446+
}
447+
#endif
418448
}
419449

420450
bool CheckEnded() {
@@ -552,6 +582,7 @@ class SortedMergeNode : public ExecNode {
552582
return true;
553583
}
554584

585+
#ifdef ARROW_ENABLE_THREADING
555586
void EmitBatches() {
556587
while (true) {
557588
// Implementation note: If the queue is empty, we will block here
@@ -567,6 +598,7 @@ class SortedMergeNode : public ExecNode {
567598

568599
/// The entry point for processThread
569600
static void StartPoller(SortedMergeNode* node) { node->EmitBatches(); }
601+
#endif
570602

571603
arrow::Ordering ordering_;
572604

@@ -583,11 +615,13 @@ class SortedMergeNode : public ExecNode {
583615

584616
std::atomic<int32_t> batches_produced{0};
585617

618+
#ifdef ARROW_ENABLE_THREADING
586619
// Queue to trigger processing of a given input. False acts as a poison pill
587620
ConcurrentQueue<bool> process_queue;
588621
// Once StartProducing is called, we initialize this thread to poll the
589622
// input states and emit batches
590623
std::thread process_thread;
624+
#endif
591625
arrow::Future<> process_task;
592626

593627
// Map arg index --> completion counter

0 commit comments

Comments
 (0)