Skip to content

Commit b14a537

Browse files
srilmanscott-routledge2raulcd
authored
GH-43694: [C++] Add Executor * Option to arrow::dataset::ScanOptions (#43698)
### Rationale for this change (See #43694) ### What changes are included in this PR? Added the option `Executor *executor` to `arrow::dataset::ScanOptions` and modified the scanner and sub-functions to either use the internally specified thread pool or the default internal pool when necessary. ### Are these changes tested? Added a Parquet scanner test that uses the new ExecContext using a separate thread pool for each fragment. ### Are there any user-facing changes? Yes, adds a new option. I'm not sure how to update the documentation though * GitHub Issue: #43694 Lead-authored-by: Scott Routledge <[email protected]> Co-authored-by: Srinivas Lade <[email protected]> Co-authored-by: scott-routledge2 <[email protected]> Co-authored-by: Raúl Cumplido <[email protected]> Signed-off-by: Rossi Sun <[email protected]>
1 parent 37faf3e commit b14a537

File tree

5 files changed

+138
-76
lines changed

5 files changed

+138
-76
lines changed

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "arrow/util/iterator.h"
3737
#include "arrow/util/logging_internal.h"
3838
#include "arrow/util/range.h"
39+
#include "arrow/util/thread_pool.h"
3940
#include "arrow/util/tracing_internal.h"
4041
#include "parquet/arrow/reader.h"
4142
#include "parquet/arrow/schema.h"
@@ -642,10 +643,12 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
642643
kParquetTypeName, options.get(), default_fragment_scan_options));
643644
int batch_readahead = options->batch_readahead;
644645
int64_t rows_to_readahead = batch_readahead * options->batch_size;
645-
ARROW_ASSIGN_OR_RAISE(auto generator,
646-
reader->GetRecordBatchGenerator(
647-
reader, row_groups, column_projection,
648-
::arrow::internal::GetCpuThreadPool(), rows_to_readahead));
646+
// Use the executor from scan options if provided.
647+
auto cpu_executor = options->cpu_executor ? options->cpu_executor
648+
: ::arrow::internal::GetCpuThreadPool();
649+
ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
650+
reader, row_groups, column_projection,
651+
cpu_executor, rows_to_readahead));
649652
RecordBatchGenerator sliced =
650653
SlicingGenerator(std::move(generator), options->batch_size);
651654
if (batch_readahead == 0) {

cpp/src/arrow/dataset/file_parquet_test.cc

Lines changed: 84 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "arrow/dataset/file_parquet.h"
1919

20+
#include <functional>
2021
#include <memory>
2122
#include <thread>
2223
#include <utility>
@@ -25,6 +26,7 @@
2526
#include "arrow/compute/api_scalar.h"
2627
#include "arrow/dataset/dataset_internal.h"
2728
#include "arrow/dataset/parquet_encryption_config.h"
29+
#include "arrow/dataset/scanner.h"
2830
#include "arrow/dataset/test_util_internal.h"
2931
#include "arrow/io/interfaces.h"
3032
#include "arrow/io/memory.h"
@@ -133,6 +135,29 @@ class ParquetFormatHelper {
133135
}
134136
};
135137

138+
class DelayedBufferReader : public ::arrow::io::BufferReader {
139+
public:
140+
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
141+
: ::arrow::io::BufferReader(buffer) {}
142+
143+
::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
144+
const ::arrow::io::IOContext& io_context, int64_t position,
145+
int64_t nbytes) override {
146+
read_async_count.fetch_add(1);
147+
auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
148+
return DeferNotOk(::arrow::io::internal::SubmitIO(
149+
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
150+
std::this_thread::sleep_for(std::chrono::seconds(1));
151+
return self->DoReadAt(position, nbytes);
152+
}));
153+
}
154+
155+
std::atomic<int> read_async_count{0};
156+
};
157+
158+
using CustomizeScanOptionsWithThreadPool =
159+
std::function<void(ScanOptions&, arrow::internal::ThreadPool*)>;
160+
136161
class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper> {
137162
public:
138163
RecordBatchIterator Batches(Fragment* fragment) {
@@ -183,6 +208,51 @@ class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper>
183208
EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
184209
}
185210
}
211+
212+
void TestMultithreadedRegression(CustomizeScanOptionsWithThreadPool customizer) {
213+
auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);
214+
ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
215+
216+
std::vector<Future<>> completes;
217+
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
218+
219+
for (int idx = 0; idx < 2; ++idx) {
220+
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
221+
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
222+
auto fragment = MakeFragment(*source);
223+
std::shared_ptr<Scanner> scanner;
224+
225+
{
226+
auto options = std::make_shared<ScanOptions>();
227+
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
228+
pools.emplace_back(thread_pool);
229+
customizer(*options, pools.back().get());
230+
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
231+
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
232+
233+
options->fragment_scan_options = fragment_scan_options;
234+
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);
235+
236+
ASSERT_OK(builder.UseThreads(true));
237+
ASSERT_OK(builder.BatchSize(10000));
238+
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
239+
}
240+
241+
ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
242+
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
243+
// Random ReadAsync calls, generate some futures to make the state machine
244+
// more complex.
245+
for (int yy = 0; yy < 16; yy++) {
246+
completes.emplace_back(
247+
buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
248+
}
249+
scanner = nullptr;
250+
}
251+
252+
for (auto& f : completes) {
253+
f.Wait();
254+
}
255+
}
186256
};
187257

188258
TEST_F(TestParquetFileFormat, InspectFailureWithRelevantError) {
@@ -904,73 +974,25 @@ TEST(TestParquetStatistics, NoNullCount) {
904974
}
905975
}
906976

907-
class DelayedBufferReader : public ::arrow::io::BufferReader {
908-
public:
909-
explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
910-
: ::arrow::io::BufferReader(buffer) {}
911-
912-
::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
913-
const ::arrow::io::IOContext& io_context, int64_t position,
914-
int64_t nbytes) override {
915-
read_async_count.fetch_add(1);
916-
auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
917-
return DeferNotOk(::arrow::io::internal::SubmitIO(
918-
io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> {
919-
std::this_thread::sleep_for(std::chrono::seconds(1));
920-
return self->DoReadAt(position, nbytes);
921-
}));
922-
}
923-
924-
std::atomic<int> read_async_count{0};
925-
};
926-
927977
TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
928978
// GH-38438: This test is similar to MultithreadedScan, but it try to use self
929979
// designed Executor and DelayedBufferReader to mock async execution to make
930980
// the state machine more complex.
931-
auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100);
932-
933-
ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
934-
935-
std::vector<Future<>> completes;
936-
std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
937-
938-
for (int idx = 0; idx < 2; ++idx) {
939-
auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
940-
auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
941-
auto fragment = MakeFragment(*source);
942-
std::shared_ptr<Scanner> scanner;
943-
944-
{
945-
auto options = std::make_shared<ScanOptions>();
946-
ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1));
947-
pools.emplace_back(thread_pool);
948-
options->io_context =
949-
::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get());
950-
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
951-
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
952-
953-
options->fragment_scan_options = fragment_scan_options;
954-
ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options);
955-
956-
ASSERT_OK(builder.UseThreads(true));
957-
ASSERT_OK(builder.BatchSize(10000));
958-
ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
959-
}
960-
961-
ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
962-
[[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
963-
// Random ReadAsync calls, generate some futures to make the state machine
964-
// more complex.
965-
for (int yy = 0; yy < 16; yy++) {
966-
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
967-
}
968-
scanner = nullptr;
969-
}
981+
CustomizeScanOptionsWithThreadPool customize_io_context =
982+
[](ScanOptions& options, arrow::internal::ThreadPool* pool) {
983+
options.io_context = ::arrow::io::IOContext(::arrow::default_memory_pool(), pool);
984+
};
985+
TestMultithreadedRegression(customize_io_context);
986+
}
970987

971-
for (auto& f : completes) {
972-
f.Wait();
973-
}
988+
TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) {
989+
// GH-43694: Test similar situation as MultithreadedScanRegression but with
990+
// the customized CPU executor instead
991+
CustomizeScanOptionsWithThreadPool customize_cpu_executor =
992+
[](ScanOptions& options, arrow::internal::ThreadPool* pool) {
993+
options.cpu_executor = pool;
994+
};
995+
TestMultithreadedRegression(customize_cpu_executor);
974996
}
975997

976998
} // namespace dataset

cpp/src/arrow/dataset/scanner.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,9 @@ class OneShotFragment : public Fragment {
360360
ARROW_ASSIGN_OR_RAISE(
361361
auto background_gen,
362362
MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor()));
363-
return MakeTransferredGenerator(std::move(background_gen),
364-
::arrow::internal::GetCpuThreadPool());
363+
auto cpu_executor = options->cpu_executor ? options->cpu_executor
364+
: ::arrow::internal::GetCpuThreadPool();
365+
return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
365366
}
366367
std::string type_name() const override { return "one-shot"; }
367368

@@ -387,15 +388,15 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
387388
[this](::arrow::internal::Executor* executor) {
388389
return ScanBatchesAsync(executor);
389390
},
390-
scan_options_->use_threads);
391+
scan_options_->use_threads, scan_options_->cpu_executor);
391392
}
392393

393394
Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
394395
return ::arrow::internal::IterateSynchronously<EnumeratedRecordBatch>(
395396
[this](::arrow::internal::Executor* executor) {
396397
return ScanBatchesUnorderedAsync(executor);
397398
},
398-
scan_options_->use_threads);
399+
scan_options_->use_threads, scan_options_->cpu_executor);
399400
}
400401

401402
Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
@@ -405,7 +406,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
405406
}
406407

407408
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
408-
return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
409+
return ScanBatchesUnorderedAsync(scan_options_->cpu_executor
410+
? scan_options_->cpu_executor
411+
: ::arrow::internal::GetCpuThreadPool(),
409412
/*sequence_fragments=*/false);
410413
}
411414

@@ -606,7 +609,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
606609
}
607610

608611
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
609-
return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
612+
return ScanBatchesAsync(scan_options_->cpu_executor
613+
? scan_options_->cpu_executor
614+
: ::arrow::internal::GetCpuThreadPool());
610615
}
611616

612617
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
@@ -783,7 +788,9 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
783788
}
784789

785790
Future<int64_t> AsyncScanner::CountRowsAsync() {
786-
return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
791+
return CountRowsAsync(scan_options_->cpu_executor
792+
? scan_options_->cpu_executor
793+
: ::arrow::internal::GetCpuThreadPool());
787794
}
788795

789796
Result<int64_t> AsyncScanner::CountRows() {

cpp/src/arrow/dataset/scanner.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "arrow/type_fwd.h"
3636
#include "arrow/util/async_generator_fwd.h"
3737
#include "arrow/util/iterator.h"
38+
#include "arrow/util/thread_pool.h"
3839
#include "arrow/util/type_fwd.h"
3940

4041
namespace arrow {
@@ -104,6 +105,13 @@ struct ARROW_DS_EXPORT ScanOptions {
104105
/// Note: The IOContext executor will be ignored if use_threads is set to false
105106
io::IOContext io_context;
106107

108+
/// Executor for any CPU tasks
109+
///
110+
/// If null, the global CPU executor will be used
111+
///
112+
/// Note: The Executor will be ignored if use_threads is set to false
113+
arrow::internal::Executor* cpu_executor = NULLPTR;
114+
107115
/// If true the scanner will scan in parallel
108116
///
109117
/// Note: If true, this will use threads from both the cpu_executor and the

cpp/src/arrow/util/thread_pool.h

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -593,9 +593,11 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
593593
}
594594

595595
/// \brief Potentially iterate an async generator serially (if use_threads is false)
596+
/// using a potentially custom Executor
596597
/// \see IterateGenerator
597598
///
598-
/// If `use_threads` is true, the global CPU executor will be used. Each call to
599+
/// If `use_threads` is true, the custom executor or, if null,
600+
/// the global CPU executor will be used. Each call to
599601
/// the iterator will simply wait until the next item is available. Tasks may run in
600602
/// the background between calls.
601603
///
@@ -605,9 +607,11 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
605607
/// calls.
606608
template <typename T>
607609
Iterator<T> IterateSynchronously(
608-
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
610+
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
611+
Executor* executor) {
609612
if (use_threads) {
610-
auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
613+
auto used_executor = executor != NULLPTR ? executor : GetCpuThreadPool();
614+
auto maybe_gen = std::move(get_gen)(used_executor);
611615
if (!maybe_gen.ok()) {
612616
return MakeErrorIterator<T>(maybe_gen.status());
613617
}
@@ -617,5 +621,23 @@ Iterator<T> IterateSynchronously(
617621
}
618622
}
619623

624+
/// \brief Potentially iterate an async generator serially (if use_threads is false)
625+
/// using the default CPU thread pool
626+
/// \see IterateGenerator
627+
///
628+
/// If `use_threads` is true, the global CPU executor will be used. Each call to
629+
/// the iterator will simply wait until the next item is available. Tasks may run in
630+
/// the background between calls.
631+
///
632+
/// If `use_threads` is false, the calling thread only will be used. Each call to
633+
/// the iterator will use the calling thread to do enough work to generate one item.
634+
/// Tasks will be left in a queue until the next call and no work will be done between
635+
/// calls.
636+
template <typename T>
637+
Iterator<T> IterateSynchronously(
638+
FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
639+
return IterateSynchronously(std::move(get_gen), use_threads, NULLPTR);
640+
}
641+
620642
} // namespace internal
621643
} // namespace arrow

0 commit comments

Comments
 (0)