From 6a6011bcec5a59f06b49576ad5e0e18a1cc7ad6e Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 13 Aug 2025 14:50:08 +0800 Subject: [PATCH 1/3] feat: support avro writer 1 add avro writer and factory(without write func since converter pull/166 not finished yet) 2 add file metrics definition --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/avro/avro_writer.cc | 146 ++++++++++++++++++++++++++++++++ src/iceberg/avro/avro_writer.h | 54 ++++++++++++ src/iceberg/file_writer.cc | 62 ++++++++++++++ src/iceberg/file_writer.h | 18 +++- src/iceberg/metrics.h | 43 ++++++++++ 6 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 src/iceberg/avro/avro_writer.cc create mode 100644 src/iceberg/avro/avro_writer.h create mode 100644 src/iceberg/file_writer.cc create mode 100644 src/iceberg/metrics.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index d90c05421..587b1596a 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -23,6 +23,7 @@ set(ICEBERG_SOURCES expression/expression.cc expression/literal.cc file_reader.cc + file_writer.cc json_internal.cc manifest_entry.cc manifest_list.cc @@ -107,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_fs_file_io.cc avro/avro_data_util.cc avro/avro_reader.cc + avro/avro_writer.cc avro/avro_schema_util.cc avro/avro_register.cc avro/avro_stream_internal.cc diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc new file mode 100644 index 000000000..114cc2d94 --- /dev/null +++ b/src/iceberg/avro/avro_writer.cc @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_writer.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/avro/avro_stream_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +namespace { + +Result> CreateOutputStream(const WriterOptions& options, + int64_t buffer_size) { + auto io = internal::checked_pointer_cast(options.io); + auto result = io->fs()->OpenOutputStream(options.path); + if (!result.ok()) { + return IOError("Failed to open file {} for {}", options.path, + result.status().message()); + } + return std::make_unique(result.MoveValueUnsafe(), buffer_size); +} + +} // namespace + +// A stateful context to keep track of the writing progress. +struct WriteContext {}; + +class AvroWriter::Impl { + public: + Status Open(const WriterOptions& options) { + write_schema_ = options.schema; + + ::avro::NodePtr root; + ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); + + avro_schema_ = std::make_shared<::avro::ValidSchema>(root); + + // Open the output stream and adapt to the avro interface. + constexpr int64_t kDefaultBufferSize = 1024 * 1024; + ICEBERG_ASSIGN_OR_RAISE(auto output_stream, + CreateOutputStream(options, kDefaultBufferSize)); + + writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( + std::move(output_stream), *avro_schema_); + return {}; + } + + Status Write(ArrowArray /*data*/) { + // TODO(xiao.dong) convert data and write to avro + // total_bytes_+= written_bytes; + return {}; + } + + Status Close() { + if (writer_ != nullptr) { + writer_->close(); + writer_.reset(); + } + return {}; + } + + bool Closed() const { return writer_ == nullptr; } + + int64_t length() { return total_bytes_; } + + private: + int64_t total_bytes_ = 0; + // The schema to write. + std::shared_ptr<::iceberg::Schema> write_schema_; + // The avro schema to write. + std::shared_ptr<::avro::ValidSchema> avro_schema_; + // The avro writer to write the data into a datum. + std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; +}; + +AvroWriter::~AvroWriter() = default; + +Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); } + +Status AvroWriter::Open(const WriterOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status AvroWriter::Close() { + if (!impl_->Closed()) { + return impl_->Close(); + } + return {}; +} + +Metrics AvroWriter::metrics() { + if (impl_->Closed()) { + // TODO(xiao.dong) implement metrics + return {}; + } + return {}; +} + +int64_t AvroWriter::length() { + if (impl_->Closed()) { + return impl_->length(); + } + return 0; +} + +std::vector AvroWriter::split_offsets() { return {}; } + +void AvroWriter::Register() { + static WriterFactoryRegistry avro_writer_register( + FileFormatType::kAvro, + []() -> Result> { return std::make_unique(); }); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h new file mode 100644 index 000000000..11701bc7e --- /dev/null +++ b/src/iceberg/avro/avro_writer.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_writer.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::avro { + +/// \brief A writer ArrowArray to Avro files. +class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { + public: + AvroWriter() = default; + + ~AvroWriter() override; + + Status Open(const WriterOptions& options) final; + + Status Close() final; + + Status Write(ArrowArray data) final; + + Metrics metrics() final; + + int64_t length() final; + + std::vector split_offsets() final; + + /// \brief Register this Avro writer implementation. + static void Register(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::avro diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc new file mode 100644 index 000000000..e5dbea347 --- /dev/null +++ b/src/iceberg/file_writer.cc @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_writer.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +WriterFactory GetNotImplementedFactory(FileFormatType format_type) { + return [format_type]() -> Result> { + return NotImplemented("Missing writer factory for file format: {}", format_type); + }; +} + +} // namespace + +WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) { + static std::unordered_map factories = { + {FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)}, + {FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)}, + {FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)}, + {FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)}, + }; + return factories.at(format_type); +} + +WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type, + WriterFactory factory) { + GetFactory(format_type) = std::move(factory); +} + +Result> WriterFactoryRegistry::Open( + FileFormatType format_type, const WriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)()); + ICEBERG_RETURN_UNEXPECTED(writer->Open(options)); + return writer; +} + +} // namespace iceberg diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 6c8e75e34..64d6e4a12 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -28,7 +28,9 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" +#include "iceberg/metrics.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -38,7 +40,7 @@ struct ICEBERG_EXPORT WriterOptions { /// \brief The path to the file to write. std::string path; /// \brief The schema of the data to write. - ArrowSchema schema; + std::shared_ptr schema; /// \brief FileIO instance to open the file. Writer implementations should down cast it /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. @@ -65,6 +67,20 @@ class ICEBERG_EXPORT Writer { /// /// \return Status of write results. virtual Status Write(ArrowArray data) = 0; + + /// \brief Get the file statistics. + /// Only valid after the file is closed. + virtual Metrics metrics() = 0; + + /// \brief Get the file length. + /// Only valid after the file is closed. + virtual int64_t length() = 0; + + /// \brief Returns a list of recommended split locations, if applicable, empty + /// otherwise. When available, this information is used for planning scan tasks whose + /// boundaries are determined by these offsets. The returned list must be sorted in + /// ascending order. Only valid after the file is closed. + virtual std::vector split_offsets() = 0; }; /// \brief Factory function to create a writer of a specific file format. diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h new file mode 100644 index 000000000..10c2b9c86 --- /dev/null +++ b/src/iceberg/metrics.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics.h +/// Iceberg file format metrics + +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Iceberg file format metrics +struct ICEBERG_EXPORT Metrics { + int64_t row_count_ = 0; + std::unordered_map column_sizes; + std::unordered_map value_counts; + std::unordered_map null_value_counts; + std::unordered_map nan_value_counts; + std::unordered_map lower_bounds; + std::unordered_map upper_bounds; +}; + +} // namespace iceberg From 74f61aee96a3aff1dff0289c75c86719df4e5071 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Wed, 13 Aug 2025 19:11:54 +0800 Subject: [PATCH 2/3] fix comment --- src/iceberg/avro/avro_reader.cc | 10 +++------- src/iceberg/avro/avro_writer.cc | 18 +++++++----------- src/iceberg/avro/avro_writer.h | 4 ++-- src/iceberg/file_writer.h | 5 ++--- src/iceberg/metrics.h | 2 +- 5 files changed, 15 insertions(+), 24 deletions(-) diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 19bc69df0..153951c22 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -31,6 +31,7 @@ #include #include +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" @@ -51,13 +52,8 @@ Result> CreateInputStream(const ReaderOptions& } auto io = internal::checked_pointer_cast(options.io); - auto result = io->fs()->OpenInputFile(file_info); - if (!result.ok()) { - return IOError("Failed to open file {} for {}", options.path, - result.status().message()); - } - - return std::make_unique(result.MoveValueUnsafe(), buffer_size); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info)); + return std::make_unique(file, buffer_size); } } // namespace diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 114cc2d94..837415e16 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -27,8 +27,8 @@ #include #include #include -#include +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" @@ -43,12 +43,8 @@ namespace { Result> CreateOutputStream(const WriterOptions& options, int64_t buffer_size) { auto io = internal::checked_pointer_cast(options.io); - auto result = io->fs()->OpenOutputStream(options.path); - if (!result.ok()) { - return IOError("Failed to open file {} for {}", options.path, - result.status().message()); - } - return std::make_unique(result.MoveValueUnsafe(), buffer_size); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + return std::make_unique(output, buffer_size); } } // namespace @@ -120,19 +116,19 @@ Status AvroWriter::Close() { return {}; } -Metrics AvroWriter::metrics() { +std::optional AvroWriter::metrics() { if (impl_->Closed()) { // TODO(xiao.dong) implement metrics return {}; } - return {}; + return std::nullopt; } -int64_t AvroWriter::length() { +std::optional AvroWriter::length() { if (impl_->Closed()) { return impl_->length(); } - return 0; + return std::nullopt; } std::vector AvroWriter::split_offsets() { return {}; } diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h index 11701bc7e..dff53c20c 100644 --- a/src/iceberg/avro/avro_writer.h +++ b/src/iceberg/avro/avro_writer.h @@ -37,9 +37,9 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { Status Write(ArrowArray data) final; - Metrics metrics() final; + std::optional metrics() final; - int64_t length() final; + std::optional length() final; std::vector split_offsets() final; diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 64d6e4a12..135151afb 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -30,7 +30,6 @@ #include "iceberg/file_format.h" #include "iceberg/metrics.h" #include "iceberg/result.h" -#include "iceberg/schema.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -70,11 +69,11 @@ class ICEBERG_EXPORT Writer { /// \brief Get the file statistics. /// Only valid after the file is closed. - virtual Metrics metrics() = 0; + virtual std::optional metrics() = 0; /// \brief Get the file length. /// Only valid after the file is closed. - virtual int64_t length() = 0; + virtual std::optional length() = 0; /// \brief Returns a list of recommended split locations, if applicable, empty /// otherwise. When available, this information is used for planning scan tasks whose diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h index 10c2b9c86..1dfeb20c8 100644 --- a/src/iceberg/metrics.h +++ b/src/iceberg/metrics.h @@ -31,7 +31,7 @@ namespace iceberg { /// \brief Iceberg file format metrics struct ICEBERG_EXPORT Metrics { - int64_t row_count_ = 0; + int64_t row_count = 0; std::unordered_map column_sizes; std::unordered_map value_counts; std::unordered_map null_value_counts; From bcce7ff4d4f15c67c3539724527f4c58e2996084 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 14 Aug 2025 09:50:04 +0200 Subject: [PATCH 3/3] wording Co-authored-by: Junwang Zhao --- src/iceberg/avro/avro_writer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h index dff53c20c..e7fb7c30c 100644 --- a/src/iceberg/avro/avro_writer.h +++ b/src/iceberg/avro/avro_writer.h @@ -24,7 +24,7 @@ namespace iceberg::avro { -/// \brief A writer ArrowArray to Avro files. +/// \brief A writer for serializing ArrowArray to Avro files. class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { public: AvroWriter() = default;