Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 253 additions & 2 deletions src/iceberg/parquet/parquet_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,266 @@
* under the License.
*/

#include <arrow/array.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/compute/api.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>

#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/parquet/parquet_data_util_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_util.h"
#include "iceberg/type.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"

namespace iceberg::parquet {

namespace {

// Forward declaration
Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
const std::shared_ptr<::arrow::Array>& array,
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
const NestedType& nested_type, std::span<const FieldProjection> projections,
::arrow::MemoryPool* pool);

/// \brief Create a null array of the given type and length.
Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
const std::shared_ptr<::arrow::DataType>& type, int64_t length,
::arrow::MemoryPool* pool) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, pool));
ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
return array;
}

Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
const std::shared_ptr<::arrow::Array>& array,
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
::arrow::MemoryPool* pool) {
if (array->type()->Equals(output_arrow_type)) {
return array;
}

// Use Arrow compute cast function for type conversions.
// Note: We don't check the schema evolution rule again because projecting schemas
// has checked this.
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
::arrow::compute::Cast(array, output_arrow_type));
return cast_result.make_array();
}

Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
const std::shared_ptr<::arrow::StructArray>& struct_array,
const std::shared_ptr<::arrow::StructType>& output_struct_type,
const StructType& struct_type, std::span<const FieldProjection> projections,
::arrow::MemoryPool* pool) {
if (struct_type.fields().size() != projections.size()) {
return InvalidSchema(
"Inconsistent number of fields ({}) and number of projections ({})",
struct_type.fields().size(), projections.size());
}
if (struct_type.fields().size() != output_struct_type->num_fields()) {
return InvalidSchema(
"Inconsistent number of fields ({}) and number of output fields ({})",
struct_type.fields().size(), output_struct_type->num_fields());
}

std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
projected_arrays.reserve(projections.size());

for (size_t i = 0; i < projections.size(); ++i) {
const auto& projected_field = struct_type.fields()[i];
const auto& field_projection = projections[i];
const auto& output_arrow_type = output_struct_type->fields()[i]->type();

std::shared_ptr<::arrow::Array> projected_array;

if (field_projection.kind == FieldProjection::Kind::kProjected) {
auto parquet_field_index =
static_cast<int>(std::get<size_t>(field_projection.from));
if (parquet_field_index >= struct_array->num_fields()) {
return InvalidArgument("Parquet field index {} out of bound {}",
parquet_field_index, struct_array->num_fields());
}
const auto& parquet_array = struct_array->field(parquet_field_index);
if (projected_field.type()->is_nested()) {
const auto& nested_type =
internal::checked_cast<const NestedType&>(*projected_field.type());
ICEBERG_ASSIGN_OR_RAISE(
projected_array,
ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
field_projection.children, pool));
} else {
ICEBERG_ASSIGN_OR_RAISE(
projected_array,
ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
}
} else if (field_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ASSIGN_OR_RAISE(
projected_array,
MakeNullArray(output_arrow_type, struct_array->length(), pool));
} else {
return NotImplemented("Unsupported field projection kind: {}",
ToString(field_projection.kind));
}

projected_arrays.emplace_back(std::move(projected_array));
}

ICEBERG_ARROW_ASSIGN_OR_RETURN(
auto output_array,
::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(),
struct_array->null_bitmap(), struct_array->null_count(),
struct_array->offset()));
return output_array;
}

/// FIXME: Support ::arrow::LargeListArray.
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
const std::shared_ptr<::arrow::ListArray>& list_array,
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
if (projections.size() != 1) {
return InvalidArgument("Expected 1 projection for list, got: {}", projections.size());
}

const auto& element_field = list_type.fields().back();
const auto& element_projection = projections[0];
const auto& output_element_type = output_list_type->value_type();

std::shared_ptr<::arrow::Array> projected_values;
if (element_field.type()->is_nested()) {
const auto& nested_type =
internal::checked_cast<const NestedType&>(*element_field.type());
ICEBERG_ASSIGN_OR_RAISE(
projected_values,
ProjectNestedArray(list_array->values(), output_element_type, nested_type,
element_projection.children, pool));
} else {
ICEBERG_ASSIGN_OR_RAISE(
projected_values,
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
}

return std::make_shared<::arrow::ListArray>(
output_list_type, list_array->length(), list_array->value_offsets(),
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
list_array->offset());
}

Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
const std::shared_ptr<::arrow::MapArray>& map_array,
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
if (projections.size() != 2) {
return InvalidArgument("Expected 2 projections for map, got: {}", projections.size());
}

const auto& key_projection = projections[0];
const auto& value_projection = projections[1];
const auto& key_type = map_type.key().type();
const auto& value_type = map_type.value().type();

// Project keys
std::shared_ptr<::arrow::Array> projected_keys;
if (key_type->is_nested()) {
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
ICEBERG_ASSIGN_OR_RAISE(
projected_keys, ProjectNestedArray(map_array->keys(), output_map_type->key_type(),
nested_type, key_projection.children, pool));
} else {
ICEBERG_ASSIGN_OR_RAISE(
projected_keys,
ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), pool));
}

// Project values
std::shared_ptr<::arrow::Array> projected_items;
if (value_type->is_nested()) {
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
ICEBERG_ASSIGN_OR_RAISE(
projected_items,
ProjectNestedArray(map_array->items(), output_map_type->item_type(), nested_type,
value_projection.children, pool));
} else {
ICEBERG_ASSIGN_OR_RAISE(
projected_items,
ProjectPrimitiveArray(map_array->items(), output_map_type->item_type(), pool));
}

return std::make_shared<::arrow::MapArray>(
output_map_type, map_array->length(), map_array->value_offsets(),
std::move(projected_keys), std::move(projected_items), map_array->null_bitmap(),
map_array->null_count(), map_array->offset());
}

Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
const std::shared_ptr<::arrow::Array>& array,
const std::shared_ptr<::arrow::DataType>& output_arrow_type,
const NestedType& nested_type, std::span<const FieldProjection> projections,
::arrow::MemoryPool* pool) {
switch (nested_type.type_id()) {
case TypeId::kStruct: {
if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
return InvalidSchema("Expected struct type, got: {}",
output_arrow_type->ToString());
}
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(array);
auto output_struct_type =
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
const auto& struct_type = internal::checked_cast<const StructType&>(nested_type);
return ProjectStructArray(struct_array, output_struct_type, struct_type,
projections, pool);
}
case TypeId::kList: {
if (output_arrow_type->id() != ::arrow::Type::LIST) {
return InvalidSchema("Expected list type, got: {}",
output_arrow_type->ToString());
}

auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
auto output_list_type =
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
return ProjectListArray(list_array, output_list_type, list_type, projections, pool);
}
case TypeId::kMap: {
if (output_arrow_type->id() != ::arrow::Type::MAP) {
return InvalidSchema("Expected map type, got: {}", output_arrow_type->ToString());
}

auto map_array = internal::checked_pointer_cast<::arrow::MapArray>(array);
auto output_map_type =
internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
const auto& map_type = internal::checked_cast<const MapType&>(nested_type);
return ProjectMapArray(map_array, output_map_type, map_type, projections, pool);
}
default:
return InvalidSchema("Cannot project array of unsupported nested type: {}",
nested_type.ToString());
}
}

} // namespace

Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
std::shared_ptr<::arrow::RecordBatch> record_batch,
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
const Schema& projected_schema, const SchemaProjection& projection) {
return NotImplemented("NYI");
const Schema& projected_schema, const SchemaProjection& projection,
::arrow::MemoryPool* pool) {
auto array = std::make_shared<::arrow::StructArray>(
::arrow::struct_(record_batch->schema()->fields()), record_batch->num_rows(),
record_batch->columns());
ICEBERG_ASSIGN_OR_RAISE(
auto output_array,
ProjectNestedArray(array, ::arrow::struct_(output_arrow_schema->fields()),
projected_schema, projection.fields, pool));
auto struct_array = internal::checked_pointer_cast<::arrow::StructArray>(output_array);
return ::arrow::RecordBatch::Make(output_arrow_schema, record_batch->num_rows(),
struct_array->fields());
}

} // namespace iceberg::parquet
11 changes: 5 additions & 6 deletions src/iceberg/parquet/parquet_data_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

#pragma once

#include "iceberg/schema_util.h"
#include <arrow/type_fwd.h>

namespace arrow {
class RecordBatch;
class Schema;
} // namespace arrow
#include "iceberg/schema_util.h"

namespace iceberg::parquet {

Expand All @@ -34,10 +31,12 @@ namespace iceberg::parquet {
/// \param output_arrow_schema The Arrow schema to convert to.
/// \param projected_schema The projected Iceberg schema.
/// \param projection The projection from projected Iceberg schema to the record batch.
/// \param pool The arrow memory pool.
/// \return The converted record batch.
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
std::shared_ptr<::arrow::RecordBatch> record_batch,
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
const Schema& projected_schema, const SchemaProjection& projection);
const Schema& projected_schema, const SchemaProjection& projection,
::arrow::MemoryPool* pool);

} // namespace iceberg::parquet
11 changes: 5 additions & 6 deletions src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,8 @@ class ParquetReader::Impl {
split_ = options.split;
read_schema_ = options.projection;

// TODO(gangwu): make memory pool configurable
::arrow::MemoryPool* pool = ::arrow::default_memory_pool();

// Prepare reader properties
::parquet::ReaderProperties reader_properties(pool);
::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(options.batch_size);
arrow_reader_properties.set_arrow_extensions_enabled(true);
Expand All @@ -131,7 +128,7 @@ class ParquetReader::Impl {
auto file_reader =
::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties);
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
pool, std::move(file_reader), arrow_reader_properties, &reader_));
pool_, std::move(file_reader), arrow_reader_properties, &reader_));

// Project read schema onto the Parquet file schema
ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_));
Expand All @@ -152,7 +149,7 @@ class ParquetReader::Impl {

ICEBERG_ASSIGN_OR_RAISE(
batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_,
*read_schema_, projection_));
*read_schema_, projection_, pool_));

ArrowArray arrow_array;
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array));
Expand Down Expand Up @@ -227,6 +224,8 @@ class ParquetReader::Impl {
}

private:
// TODO(gangwu): make memory pool configurable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still a todo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have well-thought opinion here. ReaderOptions basically cannot expose any Arrow-related concept. Maybe we cannot avoid introducing a MemoryPool interface which is similar to FileIO whose default implementation is backed by Arrow.

::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
// The split to read from the Parquet file.
std::optional<Split> split_;
// Schema to read from the Parquet file.
Expand Down
6 changes: 5 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE)
test_common.cc
in_memory_catalog_test.cc)

add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc)
add_iceberg_test(parquet_test
USE_BUNDLE
SOURCES
parquet_data_test.cc
parquet_schema_test.cc)
endif()
Loading
Loading