Skip to content

Commit 99e4a73

Browse files
committed
feat(parquet): project arrow array for parquet
1 parent 7063f2b commit 99e4a73

File tree

5 files changed

+773
-15
lines changed

5 files changed

+773
-15
lines changed

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 252 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,265 @@
1717
* under the License.
1818
*/
1919

20+
#include <arrow/array.h>
21+
#include <arrow/array/builder_primitive.h>
22+
#include <arrow/compute/api.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/type.h>
25+
26+
#include "iceberg/arrow/arrow_error_transform_internal.h"
2027
#include "iceberg/parquet/parquet_data_util_internal.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/schema_util.h"
30+
#include "iceberg/type.h"
31+
#include "iceberg/util/checked_cast.h"
32+
#include "iceberg/util/macros.h"
2133

2234
namespace iceberg::parquet {
2335

36+
namespace {
37+
38+
// Forward declaration
39+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
40+
const std::shared_ptr<::arrow::Array>& array,
41+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
42+
const NestedType& nested_type, std::span<const FieldProjection> projections,
43+
::arrow::MemoryPool* pool);
44+
45+
/// \brief Create a null array of the given type and length.
46+
Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
47+
const std::shared_ptr<::arrow::DataType>& type, int64_t length,
48+
::arrow::MemoryPool* pool) {
49+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, pool));
50+
ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
51+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
52+
return array;
53+
}
54+
55+
Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
56+
const std::shared_ptr<::arrow::Array>& array,
57+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
58+
::arrow::MemoryPool* pool) {
59+
if (array->type()->Equals(output_arrow_type)) {
60+
return array;
61+
}
62+
63+
// TODO(gangwu): Check schema evolution rule.
64+
65+
// Use Arrow compute cast function for type conversions.
66+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
67+
::arrow::compute::Cast(array, output_arrow_type));
68+
return cast_result.make_array();
69+
}
70+
71+
Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
72+
const std::shared_ptr<::arrow::StructArray>& struct_array,
73+
const std::shared_ptr<::arrow::StructType>& output_struct_type,
74+
const StructType& struct_type, std::span<const FieldProjection> projections,
75+
::arrow::MemoryPool* pool) {
76+
if (struct_type.fields().size() != projections.size()) {
77+
return InvalidSchema(
78+
"Inconsistent number of fields ({}) and number of projections ({})",
79+
struct_type.fields().size(), projections.size());
80+
}
81+
if (struct_type.fields().size() != output_struct_type->num_fields()) {
82+
return InvalidSchema(
83+
"Inconsistent number of fields ({}) and number of output fields ({})",
84+
struct_type.fields().size(), output_struct_type->num_fields());
85+
}
86+
87+
std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
88+
projected_arrays.reserve(projections.size());
89+
90+
for (size_t i = 0; i < projections.size(); ++i) {
91+
const auto& projected_field = struct_type.fields()[i];
92+
const auto& field_projection = projections[i];
93+
const auto& output_arrow_type = output_struct_type->fields()[i]->type();
94+
95+
std::shared_ptr<::arrow::Array> projected_array;
96+
97+
if (field_projection.kind == FieldProjection::Kind::kProjected) {
98+
size_t parquet_field_index = std::get<size_t>(field_projection.from);
99+
if (parquet_field_index >= struct_array->num_fields()) {
100+
return InvalidArgument("Parquet field index {} out of bound {}",
101+
parquet_field_index, struct_array->num_fields());
102+
}
103+
const auto& parquet_array = struct_array->field(parquet_field_index);
104+
if (projected_field.type()->is_nested()) {
105+
const auto& nested_type =
106+
internal::checked_cast<const NestedType&>(*projected_field.type());
107+
ICEBERG_ASSIGN_OR_RAISE(
108+
projected_array,
109+
ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
110+
field_projection.children, pool));
111+
} else {
112+
ICEBERG_ASSIGN_OR_RAISE(
113+
projected_array,
114+
ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
115+
}
116+
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
117+
ICEBERG_ASSIGN_OR_RAISE(
118+
projected_array,
119+
MakeNullArray(output_arrow_type, struct_array->length(), pool));
120+
} else {
121+
return NotImplemented("Unsupported field projection kind: {}",
122+
ToString(field_projection.kind));
123+
}
124+
125+
projected_arrays.emplace_back(std::move(projected_array));
126+
}
127+
128+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
129+
auto output_array,
130+
::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(),
131+
struct_array->null_bitmap(), struct_array->null_count(),
132+
struct_array->offset()));
133+
return output_array;
134+
}
135+
136+
/// FIXME: Support ::arrow::LargeListArray.
137+
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
138+
const std::shared_ptr<::arrow::ListArray>& list_array,
139+
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
140+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
141+
if (projections.size() != 1) {
142+
return InvalidArgument("Expected 1 projection for list, got: {}", projections.size());
143+
}
144+
145+
const auto& element_field = list_type.fields().back();
146+
const auto& element_projection = projections[0];
147+
const auto& output_element_type = output_list_type->value_type();
148+
149+
std::shared_ptr<::arrow::Array> projected_values;
150+
if (element_field.type()->is_nested()) {
151+
const auto& nested_type =
152+
internal::checked_cast<const NestedType&>(*element_field.type());
153+
ICEBERG_ASSIGN_OR_RAISE(
154+
projected_values,
155+
ProjectNestedArray(list_array->values(), output_element_type, nested_type,
156+
element_projection.children, pool));
157+
} else {
158+
ICEBERG_ASSIGN_OR_RAISE(
159+
projected_values,
160+
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
161+
}
162+
163+
return std::make_shared<::arrow::ListArray>(
164+
output_list_type, list_array->length(), list_array->value_offsets(),
165+
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
166+
list_array->offset());
167+
}
168+
169+
Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
170+
const std::shared_ptr<::arrow::MapArray>& map_array,
171+
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
172+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
173+
if (projections.size() != 2) {
174+
return InvalidArgument("Expected 2 projections for map, got: {}", projections.size());
175+
}
176+
177+
const auto& key_projection = projections[0];
178+
const auto& value_projection = projections[1];
179+
const auto& key_type = map_type.key().type();
180+
const auto& value_type = map_type.value().type();
181+
182+
// Project keys
183+
std::shared_ptr<::arrow::Array> projected_keys;
184+
if (key_type->is_nested()) {
185+
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
186+
ICEBERG_ASSIGN_OR_RAISE(
187+
projected_keys, ProjectNestedArray(map_array->keys(), output_map_type->key_type(),
188+
nested_type, key_projection.children, pool));
189+
} else {
190+
ICEBERG_ASSIGN_OR_RAISE(
191+
projected_keys,
192+
ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), pool));
193+
}
194+
195+
// Project values
196+
std::shared_ptr<::arrow::Array> projected_items;
197+
if (value_type->is_nested()) {
198+
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
199+
ICEBERG_ASSIGN_OR_RAISE(
200+
projected_items,
201+
ProjectNestedArray(map_array->items(), output_map_type->item_type(), nested_type,
202+
value_projection.children, pool));
203+
} else {
204+
ICEBERG_ASSIGN_OR_RAISE(
205+
projected_items,
206+
ProjectPrimitiveArray(map_array->items(), output_map_type->item_type(), pool));
207+
}
208+
209+
return std::make_shared<::arrow::MapArray>(
210+
output_map_type, map_array->length(), map_array->value_offsets(),
211+
std::move(projected_keys), std::move(projected_items), map_array->null_bitmap(),
212+
map_array->null_count(), map_array->offset());
213+
}
214+
215+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
216+
const std::shared_ptr<::arrow::Array>& array,
217+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
218+
const NestedType& nested_type, std::span<const FieldProjection> projections,
219+
::arrow::MemoryPool* pool) {
220+
switch (nested_type.type_id()) {
221+
case TypeId::kStruct: {
222+
if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
223+
return InvalidSchema("Expected struct type, got: {}",
224+
output_arrow_type->ToString());
225+
}
226+
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(array);
227+
auto output_struct_type =
228+
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
229+
const auto& struct_type = internal::checked_cast<const StructType&>(nested_type);
230+
return ProjectStructArray(struct_array, output_struct_type, struct_type,
231+
projections, pool);
232+
}
233+
case TypeId::kList: {
234+
if (output_arrow_type->id() != ::arrow::Type::LIST) {
235+
return InvalidSchema("Expected list type, got: {}",
236+
output_arrow_type->ToString());
237+
}
238+
239+
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
240+
auto output_list_type =
241+
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
242+
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
243+
return ProjectListArray(list_array, output_list_type, list_type, projections, pool);
244+
}
245+
case TypeId::kMap: {
246+
if (output_arrow_type->id() != ::arrow::Type::MAP) {
247+
return InvalidSchema("Expected map type, got: {}", output_arrow_type->ToString());
248+
}
249+
250+
auto map_array = internal::checked_pointer_cast<::arrow::MapArray>(array);
251+
auto output_map_type =
252+
internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
253+
const auto& map_type = internal::checked_cast<const MapType&>(nested_type);
254+
return ProjectMapArray(map_array, output_map_type, map_type, projections, pool);
255+
}
256+
default:
257+
return InvalidSchema("Cannot project array of unsupported nested type: {}",
258+
nested_type.ToString());
259+
}
260+
}
261+
262+
} // namespace
263+
24264
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
25265
std::shared_ptr<::arrow::RecordBatch> record_batch,
26266
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
27-
const Schema& projected_schema, const SchemaProjection& projection) {
28-
return NotImplemented("NYI");
267+
const Schema& projected_schema, const SchemaProjection& projection,
268+
::arrow::MemoryPool* pool) {
269+
auto array = std::make_shared<::arrow::StructArray>(
270+
::arrow::struct_(record_batch->schema()->fields()), record_batch->num_rows(),
271+
record_batch->columns());
272+
ICEBERG_ASSIGN_OR_RAISE(
273+
auto output_array,
274+
ProjectNestedArray(array, ::arrow::struct_(output_arrow_schema->fields()),
275+
projected_schema, projection.fields, pool));
276+
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(output_array);
277+
return ::arrow::RecordBatch::Make(output_arrow_schema, record_batch->num_rows(),
278+
struct_array->fields());
29279
}
30280

31281
} // namespace iceberg::parquet

src/iceberg/parquet/parquet_data_util_internal.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919

2020
#pragma once
2121

22-
#include "iceberg/schema_util.h"
22+
#include <arrow/type_fwd.h>
2323

24-
namespace arrow {
25-
class RecordBatch;
26-
class Schema;
27-
} // namespace arrow
24+
#include "iceberg/schema_util.h"
2825

2926
namespace iceberg::parquet {
3027

@@ -34,10 +31,12 @@ namespace iceberg::parquet {
3431
/// \param output_arrow_schema The Arrow schema to convert to.
3532
/// \param projected_schema The projected Iceberg schema.
3633
/// \param projection The projection from projected Iceberg schema to the record batch.
34+
/// \param pool The arrow memory pool.
3735
/// \return The converted record batch.
3836
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
3937
std::shared_ptr<::arrow::RecordBatch> record_batch,
4038
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
41-
const Schema& projected_schema, const SchemaProjection& projection);
39+
const Schema& projected_schema, const SchemaProjection& projection,
40+
::arrow::MemoryPool* pool);
4241

4342
} // namespace iceberg::parquet

src/iceberg/parquet/parquet_reader.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,8 @@ class ParquetReader::Impl {
117117
split_ = options.split;
118118
read_schema_ = options.projection;
119119

120-
// TODO(gangwu): make memory pool configurable
121-
::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
122-
123120
// Prepare reader properties
124-
::parquet::ReaderProperties reader_properties(pool);
121+
::parquet::ReaderProperties reader_properties(pool_);
125122
::parquet::ArrowReaderProperties arrow_reader_properties;
126123
arrow_reader_properties.set_batch_size(options.batch_size);
127124
arrow_reader_properties.set_arrow_extensions_enabled(true);
@@ -131,7 +128,7 @@ class ParquetReader::Impl {
131128
auto file_reader =
132129
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
133130
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
134-
pool, std::move(file_reader), arrow_reader_properties, &reader_));
131+
pool_, std::move(file_reader), arrow_reader_properties, &reader_));
135132

136133
// Project read schema onto the Parquet file schema
137134
ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_));
@@ -152,7 +149,7 @@ class ParquetReader::Impl {
152149

153150
ICEBERG_ASSIGN_OR_RAISE(
154151
batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_,
155-
*read_schema_, projection_));
152+
*read_schema_, projection_, pool_));
156153

157154
ArrowArray arrow_array;
158155
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array));
@@ -227,6 +224,8 @@ class ParquetReader::Impl {
227224
}
228225

229226
private:
227+
// TODO(gangwu): make memory pool configurable
228+
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
230229
// The split to read from the Parquet file.
231230
std::optional<Split> split_;
232231
// Schema to read from the Parquet file.

test/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE)
117117
test_common.cc
118118
in_memory_catalog_test.cc)
119119

120-
add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc)
120+
add_iceberg_test(parquet_test
121+
USE_BUNDLE
122+
SOURCES
123+
parquet_data_test.cc
124+
parquet_schema_test.cc)
121125
endif()

0 commit comments

Comments
 (0)