From 4e1bf26232679c00f63e4833aa1478e462bd2ae0 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 25 Jul 2025 17:51:41 +0800 Subject: [PATCH 1/2] feat: add scaffolding work for parquet reader --- .../IcebergThirdpartyToolchain.cmake | 3 +- src/iceberg/CMakeLists.txt | 6 +- src/iceberg/parquet/CMakeLists.txt | 18 ++ src/iceberg/parquet/parquet_data_util.cc | 31 ++ .../parquet/parquet_data_util_internal.h | 43 +++ src/iceberg/parquet/parquet_reader.cc | 300 ++++++++++++++++++ src/iceberg/parquet/parquet_reader.h | 49 +++ src/iceberg/parquet/parquet_schema_util.cc | 37 +++ .../parquet/parquet_schema_util_internal.h | 54 ++++ src/iceberg/result.h | 2 + 10 files changed, 541 insertions(+), 2 deletions(-) create mode 100644 src/iceberg/parquet/CMakeLists.txt create mode 100644 src/iceberg/parquet/parquet_data_util.cc create mode 100644 src/iceberg/parquet/parquet_data_util_internal.h create mode 100644 src/iceberg/parquet/parquet_reader.cc create mode 100644 src/iceberg/parquet/parquet_reader.h create mode 100644 src/iceberg/parquet/parquet_schema_util.cc create mode 100644 src/iceberg/parquet/parquet_schema_util_internal.h diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index ed3d1d72e..a74d09adb 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -65,8 +65,9 @@ function(resolve_arrow_dependency) set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE) + # Workaround undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) set(ARROW_IPC - OFF + ON CACHE BOOL "" FORCE) set(ARROW_FILESYSTEM ON diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 4751e9fd5..d90c05421 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -109,7 +109,10 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_reader.cc avro/avro_schema_util.cc avro/avro_register.cc - avro/avro_stream_internal.cc) + avro/avro_stream_internal.cc + parquet/parquet_data_util.cc + parquet/parquet_reader.cc + parquet/parquet_schema_util.cc) # Libraries to link with exported libiceberg_bundle.{so,a}. set(ICEBERG_BUNDLE_STATIC_BUILD_INTERFACE_LIBS) @@ -161,6 +164,7 @@ if(ICEBERG_BUILD_BUNDLE) add_subdirectory(arrow) add_subdirectory(avro) + add_subdirectory(parquet) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/iceberg_bundle_export.h DESTINATION ${ICEBERG_INSTALL_INCLUDEDIR}/iceberg) diff --git a/src/iceberg/parquet/CMakeLists.txt b/src/iceberg/parquet/CMakeLists.txt new file mode 100644 index 000000000..49a389d72 --- /dev/null +++ b/src/iceberg/parquet/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/parquet) diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc new file mode 100644 index 000000000..26edcb5dd --- /dev/null +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_data_util_internal.h" + +namespace iceberg::parquet { + +Result> ConvertRecordBatch( + std::shared_ptr<::arrow::RecordBatch> record_batch, + const std::shared_ptr<::arrow::Schema>& output_arrow_schema, + const Schema& projected_schema, const SchemaProjection& projection) { + return NotImplemented("NYI"); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_data_util_internal.h b/src/iceberg/parquet/parquet_data_util_internal.h new file mode 100644 index 000000000..bd423034d --- /dev/null +++ b/src/iceberg/parquet/parquet_data_util_internal.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/schema_util.h" + +namespace arrow { +class RecordBatch; +class Schema; +} // namespace arrow + +namespace iceberg::parquet { + +/// \brief Convert record batch read from a Parquet file to projected Iceberg Schema. +/// +/// \param record_batch The record batch to convert. +/// \param output_arrow_schema The Arrow schema to convert to. +/// \param projected_schema The projected Iceberg schema. +/// \param projection The projection from projected Iceberg schema to the record batch. +/// \return The converted record batch. +Result> ConvertRecordBatch( + std::shared_ptr<::arrow::RecordBatch> record_batch, + const std::shared_ptr<::arrow::Schema>& output_arrow_schema, + const Schema& projected_schema, const SchemaProjection& projection); + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc new file mode 100644 index 000000000..09bf59069 --- /dev/null +++ b/src/iceberg/parquet/parquet_reader.cc @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_reader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/parquet/parquet_data_util_internal.h" +#include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::parquet { + +namespace { + +Result> OpenInputStream( + const ReaderOptions& options) { + ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); + if (options.length) { + file_info.set_size(options.length.value()); + } + + auto io = internal::checked_pointer_cast(options.io); + auto result = io->fs()->OpenInputFile(file_info); + if (!result.ok()) { + return IOError("Failed to open file {} for reading: {}", options.path, + result.status().message()); + } + + return result.MoveValueUnsafe(); +} + +Result BuildProjection(::parquet::arrow::FileReader* reader, + const Schema& read_schema) { + auto metadata = reader->parquet_reader()->metadata(); + + ICEBERG_ASSIGN_OR_RAISE(auto has_field_ids, + HasFieldIds(metadata->schema()->schema_root())); + if (!has_field_ids) { + // TODO(gangwu): apply name mapping to Parquet schema + return NotImplemented("Applying name mapping to Parquet schema is not implemented"); + } + + ::parquet::arrow::SchemaManifest schema_manifest; + auto schema_manifest_result = ::parquet::arrow::SchemaManifest::Make( + metadata->schema(), metadata->key_value_metadata(), reader->properties(), + &schema_manifest); + if (!schema_manifest_result.ok()) { + return ParquetError("Failed to make schema manifest: {}", + schema_manifest_result.message()); + } + + // Leverage SchemaManifest to project the schema + ICEBERG_ASSIGN_OR_RAISE(auto projection, Project(read_schema, schema_manifest)); + + return projection; +} + +class EmptyRecordBatchReader : public ::arrow::RecordBatchReader { + public: + EmptyRecordBatchReader() = default; + ~EmptyRecordBatchReader() override = default; + + std::shared_ptr<::arrow::Schema> schema() const override { return nullptr; } + + ::arrow::Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* batch) override { + batch = nullptr; + return ::arrow::Status::OK(); + } +}; + +} // namespace + +// A stateful context to keep track of the reading progress. +struct ReadContext { + // The arrow schema to output record batches. It may be different with + // the schema of record batches returned by `record_batch_reader_` + // when there is any schema evolution. + std::shared_ptr<::arrow::Schema> output_arrow_schema_; + // The reader to read record batches from the Parquet file. + std::unique_ptr<::arrow::RecordBatchReader> record_batch_reader_; +}; + +// TODO(gangwu): list of work items +// 1. Make the memory pool configurable +// 2. Catch ParquetException and convert to Status/Result +// 3. Add utility to convert Arrow Status/Result to Iceberg Status/Result +// 4. Check field ids and apply name mapping if needed +class ParquetReader::Impl { + public: + // Open the Parquet reader with the given options + Status Open(const ReaderOptions& options) { + if (options.projection == nullptr) { + return InvalidArgument("Projected schema is required by Parquet reader"); + } + + split_ = options.split; + read_schema_ = options.projection; + + // TODO(gangwu): make memory pool configurable + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(); + + // Prepare reader properties + ::parquet::ReaderProperties reader_properties(pool); + ::parquet::ArrowReaderProperties arrow_reader_properties; + arrow_reader_properties.set_batch_size(options.batch_size); + arrow_reader_properties.set_arrow_extensions_enabled(true); + + // Open the Parquet file reader + ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options)); + auto file_reader = + ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); + auto make_reader_result = ::parquet::arrow::FileReader::Make( + pool, std::move(file_reader), arrow_reader_properties, &reader_); + if (!make_reader_result.ok()) { + return ParquetError("Failed to make file reader: {}", make_reader_result.message()); + } + + // Project read schema onto the Parquet file schema + ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_)); + + return {}; + } + + // Read the next batch of data + Result> Next() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + + auto next_result = context_->record_batch_reader_->Next(); + if (!next_result.ok()) { + return ParquetError("Failed to read next batch: {}", + next_result.status().message()); + } + + auto batch = next_result.MoveValueUnsafe(); + if (!batch) { + return std::nullopt; + } + + ICEBERG_ASSIGN_OR_RAISE( + batch, ConvertRecordBatch(std::move(batch), context_->output_arrow_schema_, + *read_schema_, projection_)); + + ArrowArray arrow_array; + auto export_result = ::arrow::ExportRecordBatch(*batch, &arrow_array); + if (!export_result.ok()) { + return ParquetError("Failed to export the Arrow record batch: {}", + export_result.message()); + } + return arrow_array; + } + + // Close the reader and release resources + Status Close() { + if (reader_ == nullptr) { + return {}; // Already closed + } + + if (context_ != nullptr) { + auto close_result = context_->record_batch_reader_->Close(); + if (!close_result.ok()) { + return ParquetError("Failed to close record batch reader: {}", + close_result.message()); + } + context_.reset(); + } + + reader_.reset(); + return {}; + } + + // Get the schema of the data + Result Schema() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + + ArrowSchema arrow_schema; + auto export_result = + ::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema); + if (!export_result.ok()) { + return ParquetError("Failed to export Arrow schema: {}", export_result.message()); + } + return arrow_schema; + } + + private: + Status InitReadContext() { + context_ = std::make_unique(); + + // Build the output Arrow schema + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); + auto import_result = ::arrow::ImportSchema(&arrow_schema); + if (!import_result.ok()) { + return ParquetError("Failed to import Arrow schema: {}", + import_result.status().message()); + } + context_->output_arrow_schema_ = import_result.MoveValueUnsafe(); + + // Row group pruning based on the split + // TODO(gangwu): add row group filtering based on zone map, bloom filter, etc. + std::vector row_group_indices; + if (split_.has_value()) { + auto metadata = reader_->parquet_reader()->metadata(); + for (int i = 0; i < metadata->num_row_groups(); ++i) { + auto row_group_offset = metadata->RowGroup(i)->file_offset(); + if (row_group_offset >= split_->offset && + row_group_offset < split_->offset + split_->length) { + row_group_indices.push_back(i); + } else if (row_group_offset >= split_->offset + split_->length) { + break; + } + } + if (row_group_indices.empty()) { + // None of the row groups are selected, return an empty record batch reader + context_->record_batch_reader_ = std::make_unique(); + return {}; + } + } + + // Create the record batch reader + ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_)); + auto reader_result = reader_->GetRecordBatchReader(row_group_indices, column_indices); + if (!reader_result.ok()) { + return ParquetError("Failed to get record batch reader: {}", + reader_result.status().message()); + } + context_->record_batch_reader_ = std::move(reader_result).MoveValueUnsafe(); + + return {}; + } + + private: + // The split to read from the Parquet file. + std::optional split_; + // Schema to read from the Parquet file. + std::shared_ptr<::iceberg::Schema> read_schema_; + // The projection result to apply to the read schema. + SchemaProjection projection_; + // Parquet file reader to create RecordBatchReader. + std::unique_ptr<::parquet::arrow::FileReader> reader_; + // The context to keep track of the reading progress. + std::unique_ptr context_; +}; + +ParquetReader::~ParquetReader() = default; + +Result> ParquetReader::Next() { return impl_->Next(); } + +Result ParquetReader::Schema() { return impl_->Schema(); } + +Status ParquetReader::Open(const ReaderOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status ParquetReader::Close() { return impl_->Close(); } + +void ParquetReader::Register() { + static ReaderFactoryRegistry parquet_reader_register( + FileFormatType::kParquet, []() -> Result> { + return std::make_unique(); + }); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_reader.h b/src/iceberg/parquet/parquet_reader.h new file mode 100644 index 000000000..d29dacabd --- /dev/null +++ b/src/iceberg/parquet/parquet_reader.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_reader.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::parquet { + +/// \brief A reader that reads ArrowArray from Parquet files. +class ICEBERG_BUNDLE_EXPORT ParquetReader : public Reader { + public: + ParquetReader() = default; + + ~ParquetReader() override; + + Status Open(const ReaderOptions& options) final; + + Status Close() final; + + Result> Next() final; + + Result Schema() final; + + static void Register(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc new file mode 100644 index 000000000..8648fa931 --- /dev/null +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_schema_util_internal.h" + +namespace iceberg::parquet { + +Result Project(const Schema& expected_schema, + const ::parquet::arrow::SchemaManifest& parquet_schema) { + return NotImplemented("NYI"); +} + +Result> SelectedColumnIndices(const SchemaProjection& projection) { + return NotImplemented("NYI"); +} + +Result HasFieldIds(const ::parquet::schema::NodePtr& root_node) { + return NotImplemented("NYI"); +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h new file mode 100644 index 000000000..f3b0f3706 --- /dev/null +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" + +namespace iceberg::parquet { + +/// \brief Project an Iceberg Schema onto a Parquet Schema. +/// +/// This function creates a projection from an Iceberg Schema to a Parquet schema. +/// The projection determines how to read data from the Parquet schema into the expected +/// Iceberg Schema. +/// +/// \param expected_schema The Iceberg Schema that defines the expected structure. +/// \param parquet_schema The Parquet schema to read data from. +/// \return The schema projection result with column indices of projected Parquet columns. +Result Project(const Schema& expected_schema, + const ::parquet::arrow::SchemaManifest& parquet_schema); + +/// \brief Get the selected column indices by walking through the projection result. +/// +/// \param projection The schema projection result. +/// \return The selected column indices. +Result> SelectedColumnIndices(const SchemaProjection& projection); + +/// \brief Check whether the Parquet schema has field IDs. +/// +/// \param root_node The root node of the Parquet schema. +/// \return True if the Parquet schema has field IDs, false otherwise. Return error if +/// the Parquet schema has partial field IDs. +Result HasFieldIds(const ::parquet::schema::NodePtr& root_node); + +} // namespace iceberg::parquet diff --git a/src/iceberg/result.h b/src/iceberg/result.h index d1aa4cedd..9bd012c2f 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -46,6 +46,7 @@ enum class ErrorKind { kNotFound, kNotImplemented, kNotSupported, + kParquetError, kUnknownError, }; @@ -93,6 +94,7 @@ DEFINE_ERROR_FUNCTION(NotAllowed) DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) +DEFINE_ERROR_FUNCTION(ParquetError) DEFINE_ERROR_FUNCTION(UnknownError) #undef DEFINE_ERROR_FUNCTION From 58dbd3cba758eb54705f1a180656f5cd0365ea9f Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 29 Jul 2025 23:53:48 +0800 Subject: [PATCH 2/2] simplify error handling --- .../IcebergThirdpartyToolchain.cmake | 2 +- .../arrow/arrow_error_transform_internal.h | 23 +++--- src/iceberg/parquet/parquet_data_util.cc | 2 +- .../parquet/parquet_data_util_internal.h | 2 +- src/iceberg/parquet/parquet_reader.cc | 78 +++++-------------- src/iceberg/result.h | 2 - 6 files changed, 36 insertions(+), 73 deletions(-) diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index a74d09adb..826d5d4fc 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -65,7 +65,7 @@ function(resolve_arrow_dependency) set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE) - # Workaround undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) + # Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) set(ARROW_IPC ON CACHE BOOL "" FORCE) diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h b/src/iceberg/arrow/arrow_error_transform_internal.h index e33df4181..cf64892f5 100644 --- a/src/iceberg/arrow/arrow_error_transform_internal.h +++ b/src/iceberg/arrow/arrow_error_transform_internal.h @@ -43,17 +43,18 @@ inline ErrorKind ToErrorKind(const ::arrow::Status& status) { } \ lhs = std::move(result_name).ValueOrDie(); -#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ - ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ - ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, ToErrorKind) - -#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ - do { \ - auto&& _status = (expr); \ - if (!_status.ok()) { \ - return std::unexpected{ \ - {.kind = ToErrorKind(_status), .message = _status.ToString()}}; \ - } \ +#define ICEBERG_ARROW_ASSIGN_OR_RETURN(lhs, rexpr) \ + ICEBERG_ARROW_ASSIGN_OR_RETURN_IMPL( \ + ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), lhs, rexpr, \ + ::iceberg::arrow::ToErrorKind) + +#define ICEBERG_ARROW_RETURN_NOT_OK(expr) \ + do { \ + auto&& _status = (expr); \ + if (!_status.ok()) { \ + return std::unexpected{{.kind = ::iceberg::arrow::ToErrorKind(_status), \ + .message = _status.ToString()}}; \ + } \ } while (0) } // namespace iceberg::arrow diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 26edcb5dd..6237b00be 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -21,7 +21,7 @@ namespace iceberg::parquet { -Result> ConvertRecordBatch( +Result> ProjectRecordBatch( std::shared_ptr<::arrow::RecordBatch> record_batch, const std::shared_ptr<::arrow::Schema>& output_arrow_schema, const Schema& projected_schema, const SchemaProjection& projection) { diff --git a/src/iceberg/parquet/parquet_data_util_internal.h b/src/iceberg/parquet/parquet_data_util_internal.h index bd423034d..c222d7497 100644 --- a/src/iceberg/parquet/parquet_data_util_internal.h +++ b/src/iceberg/parquet/parquet_data_util_internal.h @@ -35,7 +35,7 @@ namespace iceberg::parquet { /// \param projected_schema The projected Iceberg schema. /// \param projection The projection from projected Iceberg schema to the record batch. /// \return The converted record batch. -Result> ConvertRecordBatch( +Result> ProjectRecordBatch( std::shared_ptr<::arrow::RecordBatch> record_batch, const std::shared_ptr<::arrow::Schema>& output_arrow_schema, const Schema& projected_schema, const SchemaProjection& projection); diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 09bf59069..3ee260a44 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -26,17 +26,18 @@ #include #include #include -#include -#include #include #include #include #include -#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/arrow/arrow_error_transform_internal.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" #include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" @@ -52,13 +53,8 @@ Result> OpenInputStream( } auto io = internal::checked_pointer_cast(options.io); - auto result = io->fs()->OpenInputFile(file_info); - if (!result.ok()) { - return IOError("Failed to open file {} for reading: {}", options.path, - result.status().message()); - } - - return result.MoveValueUnsafe(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, io->fs()->OpenInputFile(file_info)); + return input; } Result BuildProjection(::parquet::arrow::FileReader* reader, @@ -73,17 +69,12 @@ Result BuildProjection(::parquet::arrow::FileReader* reader, } ::parquet::arrow::SchemaManifest schema_manifest; - auto schema_manifest_result = ::parquet::arrow::SchemaManifest::Make( + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::SchemaManifest::Make( metadata->schema(), metadata->key_value_metadata(), reader->properties(), - &schema_manifest); - if (!schema_manifest_result.ok()) { - return ParquetError("Failed to make schema manifest: {}", - schema_manifest_result.message()); - } + &schema_manifest)); // Leverage SchemaManifest to project the schema ICEBERG_ASSIGN_OR_RAISE(auto projection, Project(read_schema, schema_manifest)); - return projection; } @@ -141,11 +132,8 @@ class ParquetReader::Impl { ICEBERG_ASSIGN_OR_RAISE(auto input_stream, OpenInputStream(options)); auto file_reader = ::parquet::ParquetFileReader::Open(std::move(input_stream), reader_properties); - auto make_reader_result = ::parquet::arrow::FileReader::Make( - pool, std::move(file_reader), arrow_reader_properties, &reader_); - if (!make_reader_result.ok()) { - return ParquetError("Failed to make file reader: {}", make_reader_result.message()); - } + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make( + pool, std::move(file_reader), arrow_reader_properties, &reader_)); // Project read schema onto the Parquet file schema ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), *read_schema_)); @@ -159,27 +147,17 @@ class ParquetReader::Impl { ICEBERG_RETURN_UNEXPECTED(InitReadContext()); } - auto next_result = context_->record_batch_reader_->Next(); - if (!next_result.ok()) { - return ParquetError("Failed to read next batch: {}", - next_result.status().message()); - } - - auto batch = next_result.MoveValueUnsafe(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, context_->record_batch_reader_->Next()); if (!batch) { return std::nullopt; } ICEBERG_ASSIGN_OR_RAISE( - batch, ConvertRecordBatch(std::move(batch), context_->output_arrow_schema_, + batch, ProjectRecordBatch(std::move(batch), context_->output_arrow_schema_, *read_schema_, projection_)); ArrowArray arrow_array; - auto export_result = ::arrow::ExportRecordBatch(*batch, &arrow_array); - if (!export_result.ok()) { - return ParquetError("Failed to export the Arrow record batch: {}", - export_result.message()); - } + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, &arrow_array)); return arrow_array; } @@ -190,11 +168,7 @@ class ParquetReader::Impl { } if (context_ != nullptr) { - auto close_result = context_->record_batch_reader_->Close(); - if (!close_result.ok()) { - return ParquetError("Failed to close record batch reader: {}", - close_result.message()); - } + ICEBERG_ARROW_RETURN_NOT_OK(context_->record_batch_reader_->Close()); context_.reset(); } @@ -209,11 +183,8 @@ class ParquetReader::Impl { } ArrowSchema arrow_schema; - auto export_result = - ::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema); - if (!export_result.ok()) { - return ParquetError("Failed to export Arrow schema: {}", export_result.message()); - } + ICEBERG_ARROW_RETURN_NOT_OK( + ::arrow::ExportSchema(*context_->output_arrow_schema_, &arrow_schema)); return arrow_schema; } @@ -224,12 +195,8 @@ class ParquetReader::Impl { // Build the output Arrow schema ArrowSchema arrow_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); - auto import_result = ::arrow::ImportSchema(&arrow_schema); - if (!import_result.ok()) { - return ParquetError("Failed to import Arrow schema: {}", - import_result.status().message()); - } - context_->output_arrow_schema_ = import_result.MoveValueUnsafe(); + ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_, + ::arrow::ImportSchema(&arrow_schema)); // Row group pruning based on the split // TODO(gangwu): add row group filtering based on zone map, bloom filter, etc. @@ -254,12 +221,9 @@ class ParquetReader::Impl { // Create the record batch reader ICEBERG_ASSIGN_OR_RAISE(auto column_indices, SelectedColumnIndices(projection_)); - auto reader_result = reader_->GetRecordBatchReader(row_group_indices, column_indices); - if (!reader_result.ok()) { - return ParquetError("Failed to get record batch reader: {}", - reader_result.status().message()); - } - context_->record_batch_reader_ = std::move(reader_result).MoveValueUnsafe(); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + context_->record_batch_reader_, + reader_->GetRecordBatchReader(row_group_indices, column_indices)); return {}; } diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 9bd012c2f..d1aa4cedd 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -46,7 +46,6 @@ enum class ErrorKind { kNotFound, kNotImplemented, kNotSupported, - kParquetError, kUnknownError, }; @@ -94,7 +93,6 @@ DEFINE_ERROR_FUNCTION(NotAllowed) DEFINE_ERROR_FUNCTION(NotFound) DEFINE_ERROR_FUNCTION(NotImplemented) DEFINE_ERROR_FUNCTION(NotSupported) -DEFINE_ERROR_FUNCTION(ParquetError) DEFINE_ERROR_FUNCTION(UnknownError) #undef DEFINE_ERROR_FUNCTION