Skip to content

Commit 370f1f4

Browse files
committed
feat(parquet): project arrow array for parquet
1 parent 8ecee31 commit 370f1f4

File tree

2 files changed

+257
-3
lines changed

2 files changed

+257
-3
lines changed

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 253 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,266 @@
1717
* under the License.
1818
*/
1919

20+
#include <arrow/array.h>
21+
#include <arrow/array/builder_primitive.h>
22+
#include <arrow/compute/api.h>
23+
#include <arrow/record_batch.h>
24+
#include <arrow/type.h>
25+
26+
#include "iceberg/arrow/arrow_error_transform_internal.h"
2027
#include "iceberg/parquet/parquet_data_util_internal.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/schema_util.h"
30+
#include "iceberg/type.h"
31+
#include "iceberg/util/checked_cast.h"
32+
#include "iceberg/util/macros.h"
2133

2234
namespace iceberg::parquet {
2335

36+
namespace {
37+
38+
// Forward declaration
39+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
40+
const std::shared_ptr<::arrow::Array>& array,
41+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
42+
const NestedType& nested_type, std::span<const FieldProjection> projections,
43+
::arrow::MemoryPool* pool);
44+
45+
/// \brief Create a null array of the given type and length.
46+
Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
47+
const std::shared_ptr<::arrow::DataType>& type, int64_t length,
48+
::arrow::MemoryPool* pool) {
49+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, pool));
50+
ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
51+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
52+
return array;
53+
}
54+
55+
Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
56+
const std::shared_ptr<::arrow::Array>& array,
57+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
58+
::arrow::MemoryPool* pool) {
59+
if (array->type()->Equals(output_arrow_type)) {
60+
return array;
61+
}
62+
63+
// TODO(gangwu): Check schema evolution rule.
64+
65+
// Use Arrow compute cast function for type conversions.
66+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
67+
::arrow::compute::Cast(array, output_arrow_type));
68+
return cast_result.make_array();
69+
}
70+
71+
Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
72+
const std::shared_ptr<::arrow::StructArray>& struct_array,
73+
const std::shared_ptr<::arrow::StructType>& output_struct_type,
74+
const StructType& struct_type, std::span<const FieldProjection> projections,
75+
::arrow::MemoryPool* pool) {
76+
if (struct_type.fields().size() != projections.size()) {
77+
return InvalidSchema(
78+
"Inconsistent number of fields ({}) and number of projections ({})",
79+
struct_type.fields().size(), projections.size());
80+
}
81+
if (struct_type.fields().size() != output_struct_type->num_fields()) {
82+
return InvalidSchema(
83+
"Inconsistent number of fields ({}) and number of output fields ({})",
84+
struct_type.fields().size(), output_struct_type->num_fields());
85+
}
86+
87+
std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
88+
projected_arrays.reserve(projections.size());
89+
90+
for (size_t i = 0; i < projections.size(); ++i) {
91+
const auto& projected_field = struct_type.fields()[i];
92+
const auto& field_projection = projections[i];
93+
const auto& output_arrow_type = output_struct_type->fields()[i]->type();
94+
95+
std::shared_ptr<::arrow::Array> projected_array;
96+
97+
if (field_projection.kind == FieldProjection::Kind::kProjected) {
98+
size_t parquet_field_index = std::get<size_t>(field_projection.from);
99+
if (parquet_field_index >= struct_array->num_fields()) {
100+
return InvalidArgument("Parquet field index {} out of bound {}",
101+
parquet_field_index, struct_array->num_fields());
102+
}
103+
const auto& parquet_array = struct_array->field(parquet_field_index);
104+
if (projected_field.type()->is_nested()) {
105+
const auto& nested_type =
106+
internal::checked_cast<const NestedType&>(*projected_field.type());
107+
ICEBERG_ASSIGN_OR_RAISE(
108+
projected_array,
109+
ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
110+
field_projection.children, pool));
111+
} else {
112+
ICEBERG_ASSIGN_OR_RAISE(
113+
projected_array,
114+
ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
115+
}
116+
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
117+
ICEBERG_ASSIGN_OR_RAISE(
118+
projected_array,
119+
MakeNullArray(output_arrow_type, struct_array->length(), pool));
120+
} else {
121+
return NotImplemented("Unsupported field projection kind: {}",
122+
ToString(field_projection.kind));
123+
}
124+
125+
projected_arrays.emplace_back(std::move(projected_array));
126+
}
127+
128+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
129+
auto output_array,
130+
::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(),
131+
struct_array->null_bitmap(), struct_array->null_count(),
132+
struct_array->offset()));
133+
return output_array;
134+
}
135+
136+
/// FIXME: Support ::arrow::LargeListType.
137+
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
138+
const std::shared_ptr<::arrow::ListArray>& list_array,
139+
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
140+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
141+
if (projections.size() != 1) {
142+
return InvalidArgument("Expected 1 projection for list, got: {}", projections.size());
143+
}
144+
145+
const auto& element_field = list_type.fields().back();
146+
const auto& element_projection = projections[0];
147+
const auto& output_element_type = output_list_type->value_type();
148+
149+
std::shared_ptr<::arrow::Array> projected_values;
150+
if (element_field.type()->is_nested()) {
151+
const auto& nested_type =
152+
internal::checked_cast<const NestedType&>(*element_field.type());
153+
ICEBERG_ASSIGN_OR_RAISE(
154+
projected_values,
155+
ProjectNestedArray(list_array->values(), output_element_type, nested_type,
156+
element_projection.children, pool));
157+
} else {
158+
ICEBERG_ASSIGN_OR_RAISE(
159+
projected_values,
160+
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
161+
}
162+
163+
return std::make_shared<::arrow::ListArray>(
164+
output_list_type, list_array->length(), list_array->value_offsets(),
165+
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
166+
list_array->offset());
167+
}
168+
169+
/// FIXME: Support ::arrow::LargeMapType.
170+
Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
171+
const std::shared_ptr<::arrow::MapArray>& map_array,
172+
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
173+
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
174+
if (projections.size() != 2) {
175+
return InvalidArgument("Expected 2 projections for map, got: {}", projections.size());
176+
}
177+
178+
const auto& key_projection = projections[0];
179+
const auto& value_projection = projections[1];
180+
const auto& key_type = map_type.key().type();
181+
const auto& value_type = map_type.value().type();
182+
183+
// Project keys
184+
std::shared_ptr<::arrow::Array> projected_keys;
185+
if (key_type->is_nested()) {
186+
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
187+
ICEBERG_ASSIGN_OR_RAISE(
188+
projected_keys, ProjectNestedArray(map_array->keys(), output_map_type->key_type(),
189+
nested_type, key_projection.children, pool));
190+
} else {
191+
ICEBERG_ASSIGN_OR_RAISE(
192+
projected_keys,
193+
ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), pool));
194+
}
195+
196+
// Project values
197+
std::shared_ptr<::arrow::Array> projected_items;
198+
if (value_type->is_nested()) {
199+
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
200+
ICEBERG_ASSIGN_OR_RAISE(
201+
projected_items,
202+
ProjectNestedArray(map_array->items(), output_map_type->item_type(), nested_type,
203+
value_projection.children, pool));
204+
} else {
205+
ICEBERG_ASSIGN_OR_RAISE(
206+
projected_items,
207+
ProjectPrimitiveArray(map_array->items(), output_map_type->item_type(), pool));
208+
}
209+
210+
return std::make_shared<::arrow::MapArray>(
211+
output_map_type, map_array->length(), map_array->value_offsets(),
212+
std::move(projected_keys), std::move(projected_items), map_array->null_bitmap(),
213+
map_array->null_count(), map_array->offset());
214+
}
215+
216+
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
217+
const std::shared_ptr<::arrow::Array>& array,
218+
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
219+
const NestedType& nested_type, std::span<const FieldProjection> projections,
220+
::arrow::MemoryPool* pool) {
221+
switch (nested_type.type_id()) {
222+
case TypeId::kStruct: {
223+
if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
224+
return InvalidSchema("Expected struct type, got: {}",
225+
output_arrow_type->ToString());
226+
}
227+
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(array);
228+
auto output_struct_type =
229+
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
230+
const auto& struct_type = internal::checked_cast<const StructType&>(nested_type);
231+
return ProjectStructArray(struct_array, output_struct_type, struct_type,
232+
projections, pool);
233+
}
234+
case TypeId::kList: {
235+
if (output_arrow_type->id() != ::arrow::Type::LIST) {
236+
return InvalidSchema("Expected list type, got: {}",
237+
output_arrow_type->ToString());
238+
}
239+
240+
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
241+
auto output_list_type =
242+
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
243+
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
244+
return ProjectListArray(list_array, output_list_type, list_type, projections, pool);
245+
}
246+
case TypeId::kMap: {
247+
if (output_arrow_type->id() != ::arrow::Type::MAP) {
248+
return InvalidSchema("Expected map type, got: {}", output_arrow_type->ToString());
249+
}
250+
251+
auto map_array = internal::checked_pointer_cast<::arrow::MapArray>(array);
252+
auto output_map_type =
253+
internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
254+
const auto& map_type = internal::checked_cast<const MapType&>(nested_type);
255+
return ProjectMapArray(map_array, output_map_type, map_type, projections, pool);
256+
}
257+
default:
258+
return InvalidSchema("Cannot project array of unsupported nested type: {}",
259+
nested_type.ToString());
260+
}
261+
}
262+
263+
} // namespace
264+
24265
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
25266
std::shared_ptr<::arrow::RecordBatch> record_batch,
26267
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
27-
const Schema& projected_schema, const SchemaProjection& projection) {
28-
return NotImplemented("NYI");
268+
const Schema& projected_schema, const SchemaProjection& projection,
269+
::arrow::MemoryPool* pool) {
270+
auto array = std::make_shared<::arrow::StructArray>(
271+
::arrow::struct_(record_batch->schema()->fields()), record_batch->num_rows(),
272+
record_batch->columns());
273+
ICEBERG_ASSIGN_OR_RAISE(
274+
auto output_array,
275+
ProjectNestedArray(array, ::arrow::struct_(output_arrow_schema->fields()),
276+
projected_schema, projection.fields, pool));
277+
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(output_array);
278+
return ::arrow::RecordBatch::Make(output_arrow_schema, record_batch->num_rows(),
279+
struct_array->fields());
29280
}
30281

31282
} // namespace iceberg::parquet

src/iceberg/parquet/parquet_data_util_internal.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "iceberg/schema_util.h"
2323

2424
namespace arrow {
25+
class MemoryPool;
2526
class RecordBatch;
2627
class Schema;
2728
} // namespace arrow
@@ -34,10 +35,12 @@ namespace iceberg::parquet {
3435
/// \param output_arrow_schema The Arrow schema to convert to.
3536
/// \param projected_schema The projected Iceberg schema.
3637
/// \param projection The projection from projected Iceberg schema to the record batch.
38+
/// \param pool The arrow memory pool.
3739
/// \return The converted record batch.
3840
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
3941
std::shared_ptr<::arrow::RecordBatch> record_batch,
4042
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
41-
const Schema& projected_schema, const SchemaProjection& projection);
43+
const Schema& projected_schema, const SchemaProjection& projection,
44+
::arrow::MemoryPool* pool = nullptr);
4245

4346
} // namespace iceberg::parquet

0 commit comments

Comments
 (0)