1
1
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2
- index 1f8b6cc488..6ac6642fdb 100644
2
+ index e6ac389..60561d2 100644
3
3
--- a/cpp/src/arrow/dataset/file_parquet.cc
4
4
+++ b/cpp/src/arrow/dataset/file_parquet.cc
5
5
@@ -26,16 +26,23 @@
6
-
6
+
7
7
#include "arrow/compute/cast.h"
8
8
#include "arrow/compute/exec.h"
9
9
+ #include "arrow/dataset/dataset.h"
@@ -26,10 +26,10 @@ index 1f8b6cc488..6ac6642fdb 100644
26
26
#include "arrow/util/tracing_internal.h"
27
27
#include "parquet/arrow/reader.h"
28
28
#include "parquet/arrow/schema.h"
29
- @@ -555 ,6 +562 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29
+ @@ -558 ,6 +565 ,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
30
30
});
31
31
}
32
-
32
+
33
33
+ struct CastingGenerator {
34
34
+ CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> final_schema,
35
35
+ const std::unordered_set<std::string>& cols_to_skip,
@@ -95,7 +95,7 @@ index 1f8b6cc488..6ac6642fdb 100644
95
95
struct SlicingGenerator {
96
96
SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
97
97
: state(std::make_shared<State>(source, batch_size)) {}
98
- @@ -617 ,6 +686 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
98
+ @@ -620 ,6 +689 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
99
99
[this, options, parquet_fragment, pre_filtered,
100
100
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
101
101
-> Result<RecordBatchGenerator> {
@@ -105,7 +105,7 @@ index 1f8b6cc488..6ac6642fdb 100644
105
105
// Ensure that parquet_fragment has FileMetaData
106
106
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
107
107
if (!pre_filtered) {
108
- @@ -633 ,12 +705 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
108
+ @@ -636 ,12 +708 ,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
109
109
kParquetTypeName, options.get(), default_fragment_scan_options));
110
110
int batch_readahead = options->batch_readahead;
111
111
int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -136,10 +136,10 @@ index 1f8b6cc488..6ac6642fdb 100644
136
136
return sliced;
137
137
}
138
138
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
139
- index a856a792a2..5c10dfc6ac 100644
139
+ index 8fa45ac..7cd0b73 100644
140
140
--- a/cpp/src/arrow/dataset/scanner.cc
141
141
+++ b/cpp/src/arrow/dataset/scanner.cc
142
- @@ -355 ,8 +355 ,10 @@ class OneShotFragment : public Fragment {
142
+ @@ -360 ,8 +360 ,10 @@ class OneShotFragment : public Fragment {
143
143
ARROW_ASSIGN_OR_RAISE(
144
144
auto background_gen,
145
145
MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor()));
@@ -151,60 +151,68 @@ index a856a792a2..5c10dfc6ac 100644
151
151
+ return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
152
152
}
153
153
std::string type_name() const override { return "one-shot"; }
154
-
155
- @@ -382 ,7 +384 ,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
154
+
155
+ @@ -387 ,7 +389 ,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
156
156
[this](::arrow::internal::Executor* executor) {
157
157
return ScanBatchesAsync(executor);
158
158
},
159
159
- scan_options_->use_threads);
160
160
+ scan_options_->use_threads, this->async_cpu_executor());
161
161
}
162
-
162
+
163
163
Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164
- @@ -390 ,7 +392 ,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164
+ @@ -395 ,7 +397 ,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
165
165
[this](::arrow::internal::Executor* executor) {
166
166
return ScanBatchesUnorderedAsync(executor);
167
167
},
168
168
- scan_options_->use_threads);
169
169
+ scan_options_->use_threads, this->async_cpu_executor());
170
170
}
171
-
171
+
172
172
Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173
- @@ -400 ,7 +402 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173
+ @@ -405 ,7 +407 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
174
174
}
175
-
175
+
176
176
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
177
177
- return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
178
178
+ return ScanBatchesUnorderedAsync(this->async_cpu_executor(),
179
179
/*sequence_fragments=*/false);
180
180
}
181
-
182
- @@ -601 ,7 +603 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
181
+
182
+ @@ -606 ,7 +608 ,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
183
183
}
184
-
184
+
185
185
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
186
186
- return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
187
187
+ return ScanBatchesAsync(this->async_cpu_executor());
188
188
}
189
-
189
+
190
190
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
191
- @@ -778 ,7 +780 ,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
191
+ @@ -783 ,7 +785 ,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
192
192
}
193
-
193
+
194
194
Future<int64_t> AsyncScanner::CountRowsAsync() {
195
195
- return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
196
196
+ return CountRowsAsync(this->async_cpu_executor());
197
197
}
198
-
198
+
199
199
Result<int64_t> AsyncScanner::CountRows() {
200
200
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
201
- index d2de267897..1c605c1bf2 100644
201
+ index 5031057..c867ece 100644
202
202
--- a/cpp/src/arrow/dataset/scanner.h
203
203
+++ b/cpp/src/arrow/dataset/scanner.h
204
- @@ -107,6 +107,11 @@ struct ARROW_DS_EXPORT ScanOptions {
204
+ @@ -35,6 +35,7 @@
205
+ #include "arrow/type_fwd.h"
206
+ #include "arrow/util/async_generator_fwd.h"
207
+ #include "arrow/util/iterator.h"
208
+ + #include "arrow/util/thread_pool.h"
209
+ #include "arrow/util/type_fwd.h"
210
+
211
+ namespace arrow {
212
+ @@ -104,6 +105,11 @@ struct ARROW_DS_EXPORT ScanOptions {
205
213
/// Note: The IOContext executor will be ignored if use_threads is set to false
206
214
io::IOContext io_context;
207
-
215
+
208
216
+ /// ExecContext for any CPU tasks
209
217
+ ///
210
218
+ /// Note: The ExecContext executor will be ignored if use_threads is set to false
@@ -213,20 +221,20 @@ index d2de267897..1c605c1bf2 100644
213
221
/// If true the scanner will scan in parallel
214
222
///
215
223
/// Note: If true, this will use threads from both the cpu_executor and the
216
- @@ -442 ,6 +447 ,11 @@ class ARROW_DS_EXPORT Scanner {
224
+ @@ -459 ,6 +465 ,11 @@ class ARROW_DS_EXPORT Scanner {
217
225
TaggedRecordBatchIterator scan);
218
-
226
+
219
227
const std::shared_ptr<ScanOptions> scan_options_;
220
228
+
221
229
+ ::arrow::internal::Executor* async_cpu_executor() const {
222
230
+ return scan_options_->exec_context.executor() ? scan_options_->exec_context.executor()
223
231
+ : ::arrow::internal::GetCpuThreadPool();
224
232
+ }
225
233
};
226
-
234
+
227
235
/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
228
236
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
229
- index 44b1e227b0..218edc60ca 100644
237
+ index cd32781..a8935d7 100644
230
238
--- a/cpp/src/arrow/util/thread_pool.h
231
239
+++ b/cpp/src/arrow/util/thread_pool.h
232
240
@@ -20,6 +20,7 @@
@@ -240,7 +248,7 @@ index 44b1e227b0..218edc60ca 100644
240
248
@@ -591,6 +592,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
241
249
}
242
250
}
243
-
251
+
244
252
+ template <typename T>
245
253
+ Iterator<T> IterateSynchronously(
246
254
+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
@@ -274,5 +282,5 @@ index 44b1e227b0..218edc60ca 100644
274
282
- }
275
283
+ return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
276
284
}
277
-
285
+
278
286
} // namespace internal
0 commit comments