@@ -155,9 +155,8 @@ class DelayedBufferReader : public ::arrow::io::BufferReader {
155155 std::atomic<int > read_async_count{0 };
156156};
157157
158- using OptionsCustomizer =
159- std::function<void (std::shared_ptr<ScanOptions>&,
160- std::vector<std::shared_ptr<arrow::internal::ThreadPool>>&)>;
158+ using CustomizeScanOptionsWithThreadPool =
159+ std::function<void (ScanOptions&, arrow::internal::ThreadPool*)>;
161160
162161class TestParquetFileFormat : public FileFormatFixtureMixin <ParquetFormatHelper> {
163162 public:
@@ -210,11 +209,7 @@ class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper>
210209 }
211210 }
212211
213- void TestMultithreadedRegression (OptionsCustomizer customizer) {
214- // GH-38438: This test is similar to MultithreadedScan, but it try to use self
215- // designed Executor and DelayedBufferReader to mock async execution to make
216- // the state machine more complex.
217-
212+ void TestMultithreadedRegression (CustomizeScanOptionsWithThreadPool customizer) {
218213 auto reader = MakeGeneratedRecordBatch (schema ({field (" utf8" , utf8 ())}), 10000 , 100 );
219214 ASSERT_OK_AND_ASSIGN (auto buffer, ParquetFormatHelper::Write (reader.get ()));
220215
@@ -231,7 +226,7 @@ class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper>
231226 auto options = std::make_shared<ScanOptions>();
232227 ASSERT_OK_AND_ASSIGN (auto thread_pool, arrow::internal::ThreadPool::Make (1 ));
233228 pools.emplace_back (thread_pool);
234- customizer (options, pools);
229+ customizer (* options, pools. back (). get () );
235230 auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
236231 fragment_scan_options->arrow_reader_properties ->set_pre_buffer (true );
237232
@@ -980,22 +975,22 @@ TEST(TestParquetStatistics, NoNullCount) {
980975}
981976
982977TEST_F (TestParquetFileFormat, MultithreadedScanRegression) {
983- OptionsCustomizer customize_io_context =
984- [](std::shared_ptr<ScanOptions>& options,
985- std::vector<std::shared_ptr<arrow::internal::ThreadPool>>& pools) {
986- options->io_context =
987- ::arrow::io::IOContext (::arrow::default_memory_pool(), pools.back().get());
978+ // GH-38438: This test is similar to MultithreadedScan, but it try to use self
979+ // designed Executor and DelayedBufferReader to mock async execution to make
980+ // the state machine more complex.
981+ CustomizeScanOptionsWithThreadPool customize_io_context =
982+ [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
983+ options.io_context = ::arrow::io::IOContext (::arrow::default_memory_pool (), pool);
988984 };
989985 TestMultithreadedRegression (customize_io_context);
990986}
991987
992988TEST_F (TestParquetFileFormat, MultithreadedComputeRegression) {
993989 // GH-43694: Test similar situation as MultithreadedScanRegression but with
994990 // the customized CPU executor instead
995- OptionsCustomizer customize_cpu_executor =
996- [](std::shared_ptr<ScanOptions>& options,
997- std::vector<std::shared_ptr<arrow::internal::ThreadPool>>& pools) {
998- options->cpu_executor = pools.back ().get ();
991+ CustomizeScanOptionsWithThreadPool customize_cpu_executor =
992+ [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
993+ options.cpu_executor = pool;
999994 };
1000995 TestMultithreadedRegression (customize_cpu_executor);
1001996}
0 commit comments