1
1
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
2
- index 1f8b6cc488..322d50e598 100644
2
+ index 1f8b6cc488..586a5122a5 100644
3
3
--- a/cpp/src/arrow/dataset/file_parquet.cc
4
4
+++ b/cpp/src/arrow/dataset/file_parquet.cc
5
5
@@ -26,16 +26,23 @@
@@ -26,32 +26,40 @@ index 1f8b6cc488..322d50e598 100644
26
26
#include "arrow/util/tracing_internal.h"
27
27
#include "parquet/arrow/reader.h"
28
28
#include "parquet/arrow/schema.h"
29
- @@ -555,6 +562,59 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
29
+ @@ -555,6 +562,68 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
30
30
});
31
31
}
32
32
33
33
+ struct CastingGenerator {
34
34
+ CastingGenerator(RecordBatchGenerator source, std::shared_ptr<Schema> final_schema,
35
- + arrow::MemoryPool* pool = arrow::default_memory_pool())
35
+ + const std::unordered_set<std::string>& cols_to_skip,
36
+ + MemoryPool* pool = default_memory_pool())
36
37
+ : source_(source),
37
38
+ final_schema_(final_schema),
39
+ + cols_to_skip_(cols_to_skip),
38
40
+ exec_ctx(std::make_shared<compute::ExecContext>(pool)) {}
39
41
+
40
42
+ Future<std::shared_ptr<RecordBatch>> operator()() {
41
43
+ return this->source_().Then([this](const std::shared_ptr<RecordBatch>& next)
42
44
+ -> Result<std::shared_ptr<RecordBatch>> {
43
- + if (IsIterationEnd(next) || this->final_schema_.get() == nullptr) {
45
+ + if (IsIterationEnd(next) || this->final_schema_ == nullptr) {
44
46
+ return next;
45
47
+ }
46
- + std::vector<std::shared_ptr<::arrow:: Array>> out_cols;
47
- + std::vector<std::shared_ptr<arrow:: Field>> out_schema_fields;
48
+ + std::vector<std::shared_ptr<Array>> out_cols;
49
+ + std::vector<std::shared_ptr<Field>> out_schema_fields;
48
50
+
49
51
+ bool changed = false;
50
52
+ for (const auto& field : this->final_schema_->fields()) {
51
53
+ FieldRef field_ref = FieldRef(field->name());
52
54
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> column,
53
55
+ field_ref.GetOneOrNone(*next));
54
56
+ if (column) {
57
+ + if (this->cols_to_skip_.count(field->name())) {
58
+ + out_cols.emplace_back(std::move(column));
59
+ + // Maintain the original input type.
60
+ + out_schema_fields.emplace_back(field->WithType(column->type()));
61
+ + continue;
62
+ + }
55
63
+ if (!column->type()->Equals(field->type())) {
56
64
+ // Referenced field was present but didn't have the expected type.
57
65
+ ARROW_ASSIGN_OR_RAISE(
@@ -80,13 +88,14 @@ index 1f8b6cc488..322d50e598 100644
80
88
+
81
89
+ RecordBatchGenerator source_;
82
90
+ std::shared_ptr<Schema> final_schema_;
91
+ + const std::unordered_set<std::string>& cols_to_skip_;
83
92
+ std::shared_ptr<compute::ExecContext> exec_ctx;
84
93
+ };
85
94
+
86
95
struct SlicingGenerator {
87
96
SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
88
97
: state(std::make_shared<State>(source, batch_size)) {}
89
- @@ -617,6 +677 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
98
+ @@ -617,6 +686 ,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
90
99
[this, options, parquet_fragment, pre_filtered,
91
100
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
92
101
-> Result<RecordBatchGenerator> {
@@ -96,7 +105,7 @@ index 1f8b6cc488..322d50e598 100644
96
105
// Ensure that parquet_fragment has FileMetaData
97
106
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
98
107
if (!pre_filtered) {
99
- @@ -633,12 +696,19 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
108
+ @@ -633,12 +705,24 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
100
109
kParquetTypeName, options.get(), default_fragment_scan_options));
101
110
int batch_readahead = options->batch_readahead;
102
111
int64_t rows_to_readahead = batch_readahead * options->batch_size;
@@ -113,8 +122,13 @@ index 1f8b6cc488..322d50e598 100644
113
122
+ ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
114
123
+ reader, row_groups, column_projection,
115
124
+ cpu_executor, rows_to_readahead));
116
- + RecordBatchGenerator casted =
117
- + CastingGenerator(std::move(generator), options->dataset_schema, options->pool);
125
+ + // We need to skip casting the dictionary columns since the dataset_schema doesn't
126
+ + // have the dictionary-encoding information. Parquet reader will return them with the
127
+ + // dictionary type, which is what we eventually want.
128
+ + const std::unordered_set<std::string>& dict_cols =
129
+ + parquet_fragment->parquet_format_.reader_options.dict_columns;
130
+ + RecordBatchGenerator casted = CastingGenerator(
131
+ + std::move(generator), options->dataset_schema, dict_cols, options->pool);
118
132
RecordBatchGenerator sliced =
119
133
- SlicingGenerator(std::move(generator), options->batch_size);
120
134
+ SlicingGenerator(std::move(casted), options->batch_size);
0 commit comments