Skip to content

Commit 6383b6b

Browse files
scott-routledge2IsaacWarren
authored andcommitted
Upgrade arrow 20 (#1)
Changes for arrow 20 upgrade
1 parent bf77aef commit 6383b6b

File tree

2 files changed

+42
-34
lines changed

2 files changed

+42
-34
lines changed

recipe/meta.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ source:
3333
folder: cpp/submodules/parquet-testing
3434

3535
build:
36-
number: 5
36+
number: 6
3737
# for cuda support, building with one version is enough to be compatible with
3838
# all later versions, since arrow is only using libcuda, and not libcudart.
3939
skip: true # [cuda_compiler_version not in ("None", cuda_compiler_version_min)]
@@ -70,7 +70,7 @@ requirements:
7070
- clangdev {{ llvm_version }}
7171
- llvmdev {{ llvm_version }}
7272
- aws-crt-cpp
73-
- aws-sdk-cpp <=1.11.485
73+
- aws-sdk-cpp
7474
# azure filesystem dependencies, currently broken on windows, see
7575
# https://github.com/apache/arrow/issues/41990
7676
- azure-core-cpp # [unix]
@@ -223,7 +223,7 @@ outputs:
223223
- {{ compiler("cuda") }} # [cuda_compiler_version != "None"]
224224
host:
225225
- aws-crt-cpp
226-
- aws-sdk-cpp <=1.11.485
226+
- aws-sdk-cpp
227227
- azure-core-cpp # [unix]
228228
- azure-identity-cpp # [unix]
229229
- azure-storage-blobs-cpp # [unix]

recipe/patches/0004-Bodo-Changes.patch

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2-
index 1f8b6cc488..6ac6642fdb 100644
2+
index e6ac389..60561d2 100644
33
--- a/cpp/src/arrow/dataset/file_parquet.cc
44
+++ b/cpp/src/arrow/dataset/file_parquet.cc
55
@@ -26,16 +26,23 @@
6-
6+
77
#include "arrow/compute/cast.h"
88
#include "arrow/compute/exec.h"
99
+#include "arrow/dataset/dataset.h"
@@ -26,10 +26,10 @@ index 1f8b6cc488..6ac6642fdb 100644
2626
#include "arrow/util/tracing_internal.h"
2727
#include "parquet/arrow/reader.h"
2828
#include "parquet/arrow/schema.h"
29-
@@ -555,6 +562,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29+
@@ -558,6 +565,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
3030
});
3131
}
32-
32+
3333
+struct CastingGenerator {
3434
+ CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> final_schema,
3535
+ const std::unordered_set<std::string>& cols_to_skip,
@@ -95,7 +95,7 @@ index 1f8b6cc488..6ac6642fdb 100644
9595
struct SlicingGenerator {
9696
SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
9797
: state(std::make_shared<State>(source, batch_size)) {}
98-
@@ -617,6 +686,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
98+
@@ -620,6 +689,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
9999
[this, options, parquet_fragment, pre_filtered,
100100
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
101101
-> Result<RecordBatchGenerator> {
@@ -105,7 +105,7 @@ index 1f8b6cc488..6ac6642fdb 100644
105105
// Ensure that parquet_fragment has FileMetaData
106106
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
107107
if (!pre_filtered) {
108-
@@ -633,12 +705,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
108+
@@ -636,12 +708,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
109109
kParquetTypeName, options.get(), default_fragment_scan_options));
110110
int batch_readahead = options->batch_readahead;
111111
int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -136,10 +136,10 @@ index 1f8b6cc488..6ac6642fdb 100644
136136
return sliced;
137137
}
138138
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
139-
index a856a792a2..5c10dfc6ac 100644
139+
index 8fa45ac..7cd0b73 100644
140140
--- a/cpp/src/arrow/dataset/scanner.cc
141141
+++ b/cpp/src/arrow/dataset/scanner.cc
142-
@@ -355,8 +355,10 @@ class OneShotFragment : public Fragment {
142+
@@ -360,8 +360,10 @@ class OneShotFragment : public Fragment {
143143
ARROW_ASSIGN_OR_RAISE(
144144
auto background_gen,
145145
MakeBackgroundGenerator(std::move(batch_it_), options->io_context.executor()));
@@ -151,60 +151,68 @@ index a856a792a2..5c10dfc6ac 100644
151151
+ return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
152152
}
153153
std::string type_name() const override { return "one-shot"; }
154-
155-
@@ -382,7 +384,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
154+
155+
@@ -387,7 +389,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
156156
[this](::arrow::internal::Executor* executor) {
157157
return ScanBatchesAsync(executor);
158158
},
159159
- scan_options_->use_threads);
160160
+ scan_options_->use_threads, this->async_cpu_executor());
161161
}
162-
162+
163163
Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164-
@@ -390,7 +392,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
164+
@@ -395,7 +397,7 @@ Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
165165
[this](::arrow::internal::Executor* executor) {
166166
return ScanBatchesUnorderedAsync(executor);
167167
},
168168
- scan_options_->use_threads);
169169
+ scan_options_->use_threads, this->async_cpu_executor());
170170
}
171-
171+
172172
Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173-
@@ -400,7 +402,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
173+
@@ -405,7 +407,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
174174
}
175-
175+
176176
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync() {
177177
- return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
178178
+ return ScanBatchesUnorderedAsync(this->async_cpu_executor(),
179179
/*sequence_fragments=*/false);
180180
}
181-
182-
@@ -601,7 +603,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
181+
182+
@@ -606,7 +608,7 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t num_rows) {
183183
}
184-
184+
185185
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
186186
- return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
187187
+ return ScanBatchesAsync(this->async_cpu_executor());
188188
}
189-
189+
190190
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
191-
@@ -778,7 +780,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
191+
@@ -783,7 +785,7 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
192192
}
193-
193+
194194
Future<int64_t> AsyncScanner::CountRowsAsync() {
195195
- return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
196196
+ return CountRowsAsync(this->async_cpu_executor());
197197
}
198-
198+
199199
Result<int64_t> AsyncScanner::CountRows() {
200200
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
201-
index d2de267897..1c605c1bf2 100644
201+
index 5031057..c867ece 100644
202202
--- a/cpp/src/arrow/dataset/scanner.h
203203
+++ b/cpp/src/arrow/dataset/scanner.h
204-
@@ -107,6 +107,11 @@ struct ARROW_DS_EXPORT ScanOptions {
204+
@@ -35,6 +35,7 @@
205+
#include "arrow/type_fwd.h"
206+
#include "arrow/util/async_generator_fwd.h"
207+
#include "arrow/util/iterator.h"
208+
+#include "arrow/util/thread_pool.h"
209+
#include "arrow/util/type_fwd.h"
210+
211+
namespace arrow {
212+
@@ -104,6 +105,11 @@ struct ARROW_DS_EXPORT ScanOptions {
205213
/// Note: The IOContext executor will be ignored if use_threads is set to false
206214
io::IOContext io_context;
207-
215+
208216
+ /// ExecContext for any CPU tasks
209217
+ ///
210218
+ /// Note: The ExecContext executor will be ignored if use_threads is set to false
@@ -213,20 +221,20 @@ index d2de267897..1c605c1bf2 100644
213221
/// If true the scanner will scan in parallel
214222
///
215223
/// Note: If true, this will use threads from both the cpu_executor and the
216-
@@ -442,6 +447,11 @@ class ARROW_DS_EXPORT Scanner {
224+
@@ -459,6 +465,11 @@ class ARROW_DS_EXPORT Scanner {
217225
TaggedRecordBatchIterator scan);
218-
226+
219227
const std::shared_ptr<ScanOptions> scan_options_;
220228
+
221229
+ ::arrow::internal::Executor* async_cpu_executor() const {
222230
+ return scan_options_->exec_context.executor() ? scan_options_->exec_context.executor()
223231
+ : ::arrow::internal::GetCpuThreadPool();
224232
+ }
225233
};
226-
234+
227235
/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
228236
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
229-
index 44b1e227b0..218edc60ca 100644
237+
index cd32781..a8935d7 100644
230238
--- a/cpp/src/arrow/util/thread_pool.h
231239
+++ b/cpp/src/arrow/util/thread_pool.h
232240
@@ -20,6 +20,7 @@
@@ -240,7 +248,7 @@ index 44b1e227b0..218edc60ca 100644
240248
@@ -591,6 +592,21 @@ typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
241249
}
242250
}
243-
251+
244252
+template <typename T>
245253
+Iterator<T> IterateSynchronously(
246254
+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads,
@@ -274,5 +282,5 @@ index 44b1e227b0..218edc60ca 100644
274282
- }
275283
+ return IterateSynchronously(std::move(get_gen), use_threads, GetCpuThreadPool());
276284
}
277-
285+
278286
} // namespace internal

0 commit comments

Comments
 (0)