|
17 | 17 |
|
18 | 18 | #include "arrow/dataset/file_parquet.h" |
19 | 19 |
|
| 20 | +#include <functional> |
20 | 21 | #include <memory> |
21 | 22 | #include <thread> |
22 | 23 | #include <utility> |
|
25 | 26 | #include "arrow/compute/api_scalar.h" |
26 | 27 | #include "arrow/dataset/dataset_internal.h" |
27 | 28 | #include "arrow/dataset/parquet_encryption_config.h" |
| 29 | +#include "arrow/dataset/scanner.h" |
28 | 30 | #include "arrow/dataset/test_util_internal.h" |
29 | 31 | #include "arrow/io/interfaces.h" |
30 | 32 | #include "arrow/io/memory.h" |
@@ -133,6 +135,30 @@ class ParquetFormatHelper { |
133 | 135 | } |
134 | 136 | }; |
135 | 137 |
|
| 138 | +class DelayedBufferReader : public ::arrow::io::BufferReader { |
| 139 | + public: |
| 140 | + explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) |
| 141 | + : ::arrow::io::BufferReader(buffer) {} |
| 142 | + |
| 143 | + ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync( |
| 144 | + const ::arrow::io::IOContext& io_context, int64_t position, |
| 145 | + int64_t nbytes) override { |
| 146 | + read_async_count.fetch_add(1); |
| 147 | + auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this()); |
| 148 | + return DeferNotOk(::arrow::io::internal::SubmitIO( |
| 149 | + io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> { |
| 150 | + std::this_thread::sleep_for(std::chrono::seconds(1)); |
| 151 | + return self->DoReadAt(position, nbytes); |
| 152 | + })); |
| 153 | + } |
| 154 | + |
| 155 | + std::atomic<int> read_async_count{0}; |
| 156 | +}; |
| 157 | + |
| 158 | +using OptionsCustomizer = |
| 159 | + std::function<void(std::shared_ptr<ScanOptions>&, |
| 160 | + std::vector<std::shared_ptr<arrow::internal::ThreadPool>>&)>; |
| 161 | + |
136 | 162 | class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper> { |
137 | 163 | public: |
138 | 164 | RecordBatchIterator Batches(Fragment* fragment) { |
@@ -183,6 +209,55 @@ class TestParquetFileFormat : public FileFormatFixtureMixin<ParquetFormatHelper> |
183 | 209 | EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1); |
184 | 210 | } |
185 | 211 | } |
| 212 | + |
| 213 | + void TestMultithreadedRegression(OptionsCustomizer customizer) { |
| 214 | + // GH-38438: This test is similar to MultithreadedScan, but it try to use self |
| 215 | + // designed Executor and DelayedBufferReader to mock async execution to make |
| 216 | + // the state machine more complex. |
| 217 | + |
| 218 | + auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); |
| 219 | + ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); |
| 220 | + |
| 221 | + std::vector<Future<>> completes; |
| 222 | + std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools; |
| 223 | + |
| 224 | + for (int idx = 0; idx < 2; ++idx) { |
| 225 | + auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer); |
| 226 | + auto source = std::make_shared<FileSource>(buffer_reader, buffer->size()); |
| 227 | + auto fragment = MakeFragment(*source); |
| 228 | + std::shared_ptr<Scanner> scanner; |
| 229 | + |
| 230 | + { |
| 231 | + auto options = std::make_shared<ScanOptions>(); |
| 232 | + ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); |
| 233 | + pools.emplace_back(thread_pool); |
| 234 | + customizer(options, pools); |
| 235 | + auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>(); |
| 236 | + fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); |
| 237 | + |
| 238 | + options->fragment_scan_options = fragment_scan_options; |
| 239 | + ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); |
| 240 | + |
| 241 | + ASSERT_OK(builder.UseThreads(true)); |
| 242 | + ASSERT_OK(builder.BatchSize(10000)); |
| 243 | + ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); |
| 244 | + } |
| 245 | + |
| 246 | + ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); |
| 247 | + [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); |
| 248 | + // Random ReadAsync calls, generate some futures to make the state machine |
| 249 | + // more complex. |
| 250 | + for (int yy = 0; yy < 16; yy++) { |
| 251 | + completes.emplace_back( |
| 252 | + buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); |
| 253 | + } |
| 254 | + scanner = nullptr; |
| 255 | + } |
| 256 | + |
| 257 | + for (auto& f : completes) { |
| 258 | + f.Wait(); |
| 259 | + } |
| 260 | + } |
186 | 261 | }; |
187 | 262 |
|
188 | 263 | TEST_F(TestParquetFileFormat, InspectFailureWithRelevantError) { |
@@ -904,120 +979,25 @@ TEST(TestParquetStatistics, NoNullCount) { |
904 | 979 | } |
905 | 980 | } |
906 | 981 |
|
907 | | -class DelayedBufferReader : public ::arrow::io::BufferReader { |
908 | | - public: |
909 | | - explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) |
910 | | - : ::arrow::io::BufferReader(buffer) {} |
911 | | - |
912 | | - ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync( |
913 | | - const ::arrow::io::IOContext& io_context, int64_t position, |
914 | | - int64_t nbytes) override { |
915 | | - read_async_count.fetch_add(1); |
916 | | - auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this()); |
917 | | - return DeferNotOk(::arrow::io::internal::SubmitIO( |
918 | | - io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> { |
919 | | - std::this_thread::sleep_for(std::chrono::seconds(1)); |
920 | | - return self->DoReadAt(position, nbytes); |
921 | | - })); |
922 | | - } |
923 | | - |
924 | | - std::atomic<int> read_async_count{0}; |
925 | | -}; |
926 | | - |
927 | 982 | TEST_F(TestParquetFileFormat, MultithreadedScanRegression) { |
928 | | - // GH-38438: This test is similar to MultithreadedScan, but it try to use self |
929 | | - // designed Executor and DelayedBufferReader to mock async execution to make |
930 | | - // the state machine more complex. |
931 | | - auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); |
932 | | - |
933 | | - ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); |
934 | | - |
935 | | - std::vector<Future<>> completes; |
936 | | - std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools; |
937 | | - |
938 | | - for (int idx = 0; idx < 2; ++idx) { |
939 | | - auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer); |
940 | | - auto source = std::make_shared<FileSource>(buffer_reader, buffer->size()); |
941 | | - auto fragment = MakeFragment(*source); |
942 | | - std::shared_ptr<Scanner> scanner; |
943 | | - |
944 | | - { |
945 | | - auto options = std::make_shared<ScanOptions>(); |
946 | | - ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); |
947 | | - pools.emplace_back(thread_pool); |
948 | | - options->io_context = |
949 | | - ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); |
950 | | - auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>(); |
951 | | - fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); |
952 | | - |
953 | | - options->fragment_scan_options = fragment_scan_options; |
954 | | - ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); |
955 | | - |
956 | | - ASSERT_OK(builder.UseThreads(true)); |
957 | | - ASSERT_OK(builder.BatchSize(10000)); |
958 | | - ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); |
959 | | - } |
960 | | - |
961 | | - ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); |
962 | | - [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); |
963 | | - // Random ReadAsync calls, generate some futures to make the state machine |
964 | | - // more complex. |
965 | | - for (int yy = 0; yy < 16; yy++) { |
966 | | - completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); |
967 | | - } |
968 | | - scanner = nullptr; |
969 | | - } |
970 | | - |
971 | | - for (auto& f : completes) { |
972 | | - f.Wait(); |
973 | | - } |
| 983 | + OptionsCustomizer customize_io_context = |
| 984 | + [](std::shared_ptr<ScanOptions>& options, |
| 985 | + std::vector<std::shared_ptr<arrow::internal::ThreadPool>>& pools) { |
| 986 | + options->io_context = |
| 987 | + ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); |
| 988 | + }; |
| 989 | + TestMultithreadedRegression(customize_io_context); |
974 | 990 | } |
975 | 991 |
|
976 | 992 | TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) { |
977 | 993 | // GH-43694: Test similar situation as MultithreadedScanRegression but with |
978 | | - // the exec context instead |
979 | | - |
980 | | - auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); |
981 | | - ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); |
982 | | - |
983 | | - std::vector<Future<>> completes; |
984 | | - std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools; |
985 | | - |
986 | | - for (int idx = 0; idx < 2; ++idx) { |
987 | | - auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer); |
988 | | - auto source = std::make_shared<FileSource>(buffer_reader, buffer->size()); |
989 | | - auto fragment = MakeFragment(*source); |
990 | | - std::shared_ptr<Scanner> scanner; |
991 | | - |
992 | | - { |
993 | | - auto options = std::make_shared<ScanOptions>(); |
994 | | - ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); |
995 | | - pools.emplace_back(thread_pool); |
996 | | - options->cpu_executor = pools.back().get(); |
997 | | - auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>(); |
998 | | - fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); |
999 | | - |
1000 | | - options->fragment_scan_options = fragment_scan_options; |
1001 | | - ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); |
1002 | | - |
1003 | | - ASSERT_OK(builder.UseThreads(true)); |
1004 | | - ASSERT_OK(builder.BatchSize(10000)); |
1005 | | - ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); |
1006 | | - } |
1007 | | - |
1008 | | - ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); |
1009 | | - [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); |
1010 | | - // Random ReadAsync calls, generate some futures to make the state machine |
1011 | | - // more complex. |
1012 | | - for (int yy = 0; yy < 16; yy++) { |
1013 | | - completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); |
1014 | | - } |
1015 | | - scanner = nullptr; |
1016 | | - } |
1017 | | - |
1018 | | - for (auto& f : completes) { |
1019 | | - f.Wait(); |
1020 | | - } |
| 994 | + // the customized CPU executor instead |
| 995 | + OptionsCustomizer customize_cpu_executor = |
| 996 | + [](std::shared_ptr<ScanOptions>& options, |
| 997 | + std::vector<std::shared_ptr<arrow::internal::ThreadPool>>& pools) { |
| 998 | + options->cpu_executor = pools.back().get(); |
| 999 | + }; |
| 1000 | + TestMultithreadedRegression(customize_cpu_executor); |
1021 | 1001 | } |
1022 | 1002 |
|
1023 | 1003 | } // namespace dataset |
|
0 commit comments