Skip to content

Commit 687ca03

Browse files
authored
feat(parquet): project arrow array for parquet (#165)
1 parent a54f116 commit 687ca03

File tree

5 files changed

+774
-15
lines changed

5 files changed

+774
-15
lines changed

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 253 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,266 @@
1717
* under the License.
1818
*/
1919

20+
#include <arrow/array.h>
21+
#include <arrow/array/builder_primitive.h>
22+
#include <arrow/compute/api.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/type.h>
25+
26+
#include "iceberg/arrow/arrow_error_transform_internal.h"
2027
#include "iceberg/parquet/parquet_data_util_internal.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/schema_util.h"
30+
#include "iceberg/type.h"
31+
#include "iceberg/util/checked_cast.h"
32+
#include "iceberg/util/macros.h"
2133

2234
namespace iceberg::parquet {
2335

36+
namespace {
37+
38+
// Forward declaration
39+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
40+
const std::shared_ptr<::arrow::Array>& array,
41+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
42+
const NestedType& nested_type, std::span<const FieldProjection> projections,
43+
::arrow::MemoryPool* pool);
44+
45+
/// \brief Create a null array of the given type and length.
46+
Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
47+
const std::shared_ptr<::arrow::DataType>& type, int64_t length,
48+
::arrow::MemoryPool* pool) {
49+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, pool));
50+
ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
51+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
52+
return array;
53+
}
54+
55+
Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
56+
const std::shared_ptr<::arrow::Array>& array,
57+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
58+
::arrow::MemoryPool* pool) {
59+
if (array->type()->Equals(output_arrow_type)) {
60+
return array;
61+
}
62+
63+
// Use Arrow compute cast function for type conversions.
64+
// Note: We don't check the schema evolution rule again because projecting schemas
65+
// has checked this.
66+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
67+
::arrow::compute::Cast(array, output_arrow_type));
68+
return cast_result.make_array();
69+
}
70+
71+
Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
72+
const std::shared_ptr<::arrow::StructArray>& struct_array,
73+
const std::shared_ptr<::arrow::StructType>& output_struct_type,
74+
const StructType& struct_type, std::span<const FieldProjection> projections,
75+
::arrow::MemoryPool* pool) {
76+
if (struct_type.fields().size() != projections.size()) {
77+
return InvalidSchema(
78+
"Inconsistent number of fields ({}) and number of projections ({})",
79+
struct_type.fields().size(), projections.size());
80+
}
81+
if (struct_type.fields().size() != output_struct_type->num_fields()) {
82+
return InvalidSchema(
83+
"Inconsistent number of fields ({}) and number of output fields ({})",
84+
struct_type.fields().size(), output_struct_type->num_fields());
85+
}
86+
87+
std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
88+
projected_arrays.reserve(projections.size());
89+
90+
for (size_t i = 0; i < projections.size(); ++i) {
91+
const auto& projected_field = struct_type.fields()[i];
92+
const auto& field_projection = projections[i];
93+
const auto& output_arrow_type = output_struct_type->fields()[i]->type();
94+
95+
std::shared_ptr<::arrow::Array> projected_array;
96+
97+
if (field_projection.kind == FieldProjection::Kind::kProjected) {
98+
auto parquet_field_index =
99+
static_cast<int>(std::get<size_t>(field_projection.from));
100+
if (parquet_field_index >= struct_array->num_fields()) {
101+
return InvalidArgument("Parquet field index {} out of bound {}",
102+
parquet_field_index, struct_array->num_fields());
103+
}
104+
const auto& parquet_array = struct_array->field(parquet_field_index);
105+
if (projected_field.type()->is_nested()) {
106+
const auto& nested_type =
107+
internal::checked_cast<const NestedType&>(*projected_field.type());
108+
ICEBERG_ASSIGN_OR_RAISE(
109+
projected_array,
110+
ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
111+
field_projection.children, pool));
112+
} else {
113+
ICEBERG_ASSIGN_OR_RAISE(
114+
projected_array,
115+
ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
116+
}
117+
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
118+
ICEBERG_ASSIGN_OR_RAISE(
119+
projected_array,
120+
MakeNullArray(output_arrow_type, struct_array->length(), pool));
121+
} else {
122+
return NotImplemented("Unsupported field projection kind: {}",
123+
ToString(field_projection.kind));
124+
}
125+
126+
projected_arrays.emplace_back(std::move(projected_array));
127+
}
128+
129+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
130+
auto output_array,
131+
::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(),
132+
struct_array->null_bitmap(), struct_array->null_count(),
133+
struct_array->offset()));
134+
return output_array;
135+
}
136+
137+
/// FIXME: Support ::arrow::LargeListArray.
138+
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
139+
const std::shared_ptr<::arrow::ListArray>& list_array,
140+
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
141+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
142+
if (projections.size() != 1) {
143+
return InvalidArgument("Expected 1 projection for list, got: {}", projections.size());
144+
}
145+
146+
const auto& element_field = list_type.fields().back();
147+
const auto& element_projection = projections[0];
148+
const auto& output_element_type = output_list_type->value_type();
149+
150+
std::shared_ptr<::arrow::Array> projected_values;
151+
if (element_field.type()->is_nested()) {
152+
const auto& nested_type =
153+
internal::checked_cast<const NestedType&>(*element_field.type());
154+
ICEBERG_ASSIGN_OR_RAISE(
155+
projected_values,
156+
ProjectNestedArray(list_array->values(), output_element_type, nested_type,
157+
element_projection.children, pool));
158+
} else {
159+
ICEBERG_ASSIGN_OR_RAISE(
160+
projected_values,
161+
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
162+
}
163+
164+
return std::make_shared<::arrow::ListArray>(
165+
output_list_type, list_array->length(), list_array->value_offsets(),
166+
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
167+
list_array->offset());
168+
}
169+
170+
Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
171+
const std::shared_ptr<::arrow::MapArray>& map_array,
172+
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
173+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
174+
if (projections.size() != 2) {
175+
return InvalidArgument("Expected 2 projections for map, got: {}", projections.size());
176+
}
177+
178+
const auto& key_projection = projections[0];
179+
const auto& value_projection = projections[1];
180+
const auto& key_type = map_type.key().type();
181+
const auto& value_type = map_type.value().type();
182+
183+
// Project keys
184+
std::shared_ptr<::arrow::Array> projected_keys;
185+
if (key_type->is_nested()) {
186+
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
187+
ICEBERG_ASSIGN_OR_RAISE(
188+
projected_keys, ProjectNestedArray(map_array->keys(), output_map_type->key_type(),
189+
nested_type, key_projection.children, pool));
190+
} else {
191+
ICEBERG_ASSIGN_OR_RAISE(
192+
projected_keys,
193+
ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), pool));
194+
}
195+
196+
// Project values
197+
std::shared_ptr<::arrow::Array> projected_items;
198+
if (value_type->is_nested()) {
199+
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
200+
ICEBERG_ASSIGN_OR_RAISE(
201+
projected_items,
202+
ProjectNestedArray(map_array->items(), output_map_type->item_type(), nested_type,
203+
value_projection.children, pool));
204+
} else {
205+
ICEBERG_ASSIGN_OR_RAISE(
206+
projected_items,
207+
ProjectPrimitiveArray(map_array->items(), output_map_type->item_type(), pool));
208+
}
209+
210+
return std::make_shared<::arrow::MapArray>(
211+
output_map_type, map_array->length(), map_array->value_offsets(),
212+
std::move(projected_keys), std::move(projected_items), map_array->null_bitmap(),
213+
map_array->null_count(), map_array->offset());
214+
}
215+
216+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
217+
const std::shared_ptr<::arrow::Array>& array,
218+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
219+
const NestedType& nested_type, std::span<const FieldProjection> projections,
220+
::arrow::MemoryPool* pool) {
221+
switch (nested_type.type_id()) {
222+
case TypeId::kStruct: {
223+
if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
224+
return InvalidSchema("Expected struct type, got: {}",
225+
output_arrow_type->ToString());
226+
}
227+
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(array);
228+
auto output_struct_type =
229+
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
230+
const auto& struct_type = internal::checked_cast<const StructType&>(nested_type);
231+
return ProjectStructArray(struct_array, output_struct_type, struct_type,
232+
projections, pool);
233+
}
234+
case TypeId::kList: {
235+
if (output_arrow_type->id() != ::arrow::Type::LIST) {
236+
return InvalidSchema("Expected list type, got: {}",
237+
output_arrow_type->ToString());
238+
}
239+
240+
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
241+
auto output_list_type =
242+
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
243+
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
244+
return ProjectListArray(list_array, output_list_type, list_type, projections, pool);
245+
}
246+
case TypeId::kMap: {
247+
if (output_arrow_type->id() != ::arrow::Type::MAP) {
248+
return InvalidSchema("Expected map type, got: {}", output_arrow_type->ToString());
249+
}
250+
251+
auto map_array = internal::checked_pointer_cast<::arrow::MapArray>(array);
252+
auto output_map_type =
253+
internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
254+
const auto& map_type = internal::checked_cast<const MapType&>(nested_type);
255+
return ProjectMapArray(map_array, output_map_type, map_type, projections, pool);
256+
}
257+
default:
258+
return InvalidSchema("Cannot project array of unsupported nested type: {}",
259+
nested_type.ToString());
260+
}
261+
}
262+
263+
} // namespace
264+
24265
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
25266
std::shared_ptr<::arrow::RecordBatch> record_batch,
26267
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
27-
const Schema& projected_schema, const SchemaProjection& projection) {
28-
return NotImplemented("NYI");
268+
const Schema& projected_schema, const SchemaProjection& projection,
269+
::arrow::MemoryPool* pool) {
270+
auto array = std::make_shared<::arrow::StructArray>(
271+
::arrow::struct_(record_batch->schema()->fields()), record_batch->num_rows(),
272+
record_batch->columns());
273+
ICEBERG_ASSIGN_OR_RAISE(
274+
auto output_array,
275+
ProjectNestedArray(array, ::arrow::struct_(output_arrow_schema->fields()),
276+
projected_schema, projection.fields, pool));
277+
auto* struct_array = internal::checked_cast<::arrow::StructArray*>(output_array.get());
278+
return ::arrow::RecordBatch::Make(output_arrow_schema, record_batch->num_rows(),
279+
struct_array->fields());
29280
}
30281

31282
} // namespace iceberg::parquet

src/iceberg/parquet/parquet_data_util_internal.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919

2020
#pragma once
2121

22-
#include "iceberg/schema_util.h"
22+
#include <arrow/type_fwd.h>
2323

24-
namespace arrow {
25-
class RecordBatch;
26-
class Schema;
27-
} // namespace arrow
24+
#include "iceberg/schema_util.h"
2825

2926
namespace iceberg::parquet {
3027

@@ -34,10 +31,12 @@ namespace iceberg::parquet {
3431
/// \param output_arrow_schema The Arrow schema to convert to.
3532
/// \param projected_schema The projected Iceberg schema.
3633
/// \param projection The projection from projected Iceberg schema to the record batch.
34+
/// \param pool The arrow memory pool.
3735
/// \return The converted record batch.
3836
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
3937
std::shared_ptr<::arrow::RecordBatch> record_batch,
4038
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
41-
const Schema& projected_schema, const SchemaProjection& projection);
39+
const Schema& projected_schema, const SchemaProjection& projection,
40+
::arrow::MemoryPool* pool);
4241

4342
} // namespace iceberg::parquet

src/iceberg/parquet/parquet_reader.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,8 @@ class ParquetReader::Impl {
117117
split_ = options.split;
118118
read_schema_ = options.projection;
119119

120-
// TODO(gangwu): make memory pool configurable
121-
::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
122-
123120
// Prepare reader properties
124-
::parquet::ReaderProperties reader_properties(pool);
121+
::parquet::ReaderProperties reader_properties(pool_);
125122
::parquet::ArrowReaderProperties arrow_reader_properties;
126123
arrow_reader_properties.set_batch_size(options.batch_size);
127124
arrow_reader_properties.set_arrow_extensions_enabled(true);
@@ -131,7 +128,7 @@ class ParquetReader::Impl {
131128
auto file_reader =
132129
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
133130
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
134-
pool, std::move(file_reader), arrow_reader_properties, &reader_));
131+
pool_, std::move(file_reader), arrow_reader_properties, &reader_));
135132

136133
// Project read schema onto the Parquet file schema
137134
ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_));
@@ -152,7 +149,7 @@ class ParquetReader::Impl {
152149

153150
ICEBERG_ASSIGN_OR_RAISE(
154151
batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_,
155-
*read_schema_, projection_));
152+
*read_schema_, projection_, pool_));
156153

157154
ArrowArray arrow_array;
158155
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array));
@@ -227,6 +224,8 @@ class ParquetReader::Impl {
227224
}
228225

229226
private:
227+
// TODO(gangwu): make memory pool configurable
228+
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
230229
// The split to read from the Parquet file.
231230
std::optional<Split> split_;
232231
// Schema to read from the Parquet file.

test/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE)
117117
test_common.cc
118118
in_memory_catalog_test.cc)
119119

120-
add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc)
120+
add_iceberg_test(parquet_test
121+
USE_BUNDLE
122+
SOURCES
123+
parquet_data_test.cc
124+
parquet_schema_test.cc)
121125
endif()

0 commit comments

Comments
 (0)