diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index d90c05421..587b1596a 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -23,6 +23,7 @@ set(ICEBERG_SOURCES expression/expression.cc expression/literal.cc file_reader.cc + file_writer.cc json_internal.cc manifest_entry.cc manifest_list.cc @@ -107,6 +108,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_fs_file_io.cc avro/avro_data_util.cc avro/avro_reader.cc + avro/avro_writer.cc avro/avro_schema_util.cc avro/avro_register.cc avro/avro_stream_internal.cc diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 19bc69df0..153951c22 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -31,6 +31,7 @@ #include #include +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" @@ -51,13 +52,8 @@ Result> CreateInputStream(const ReaderOptions& } auto io = internal::checked_pointer_cast(options.io); - auto result = io->fs()->OpenInputFile(file_info); - if (!result.ok()) { - return IOError("Failed to open file {} for {}", options.path, - result.status().message()); - } - - return std::make_unique(result.MoveValueUnsafe(), buffer_size); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info)); + return std::make_unique(file, buffer_size); } } // namespace diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc new file mode 100644 index 000000000..837415e16 --- /dev/null +++ b/src/iceberg/avro/avro_writer.cc @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_writer.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/avro/avro_stream_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +namespace { + +Result> CreateOutputStream(const WriterOptions& options, + int64_t buffer_size) { + auto io = internal::checked_pointer_cast(options.io); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + return std::make_unique(output, buffer_size); +} + +} // namespace + +// A stateful context to keep track of the writing progress. +struct WriteContext {}; + +class AvroWriter::Impl { + public: + Status Open(const WriterOptions& options) { + write_schema_ = options.schema; + + ::avro::NodePtr root; + ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); + + avro_schema_ = std::make_shared<::avro::ValidSchema>(root); + + // Open the output stream and adapt to the avro interface. + constexpr int64_t kDefaultBufferSize = 1024 * 1024; + ICEBERG_ASSIGN_OR_RAISE(auto output_stream, + CreateOutputStream(options, kDefaultBufferSize)); + + writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>( + std::move(output_stream), *avro_schema_); + return {}; + } + + Status Write(ArrowArray /*data*/) { + // TODO(xiao.dong) convert data and write to avro + // total_bytes_+= written_bytes; + return {}; + } + + Status Close() { + if (writer_ != nullptr) { + writer_->close(); + writer_.reset(); + } + return {}; + } + + bool Closed() const { return writer_ == nullptr; } + + int64_t length() { return total_bytes_; } + + private: + int64_t total_bytes_ = 0; + // The schema to write. + std::shared_ptr<::iceberg::Schema> write_schema_; + // The avro schema to write. + std::shared_ptr<::avro::ValidSchema> avro_schema_; + // The avro writer to write the data into a datum. + std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_; +}; + +AvroWriter::~AvroWriter() = default; + +Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); } + +Status AvroWriter::Open(const WriterOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status AvroWriter::Close() { + if (!impl_->Closed()) { + return impl_->Close(); + } + return {}; +} + +std::optional AvroWriter::metrics() { + if (impl_->Closed()) { + // TODO(xiao.dong) implement metrics + return {}; + } + return std::nullopt; +} + +std::optional AvroWriter::length() { + if (impl_->Closed()) { + return impl_->length(); + } + return std::nullopt; +} + +std::vector AvroWriter::split_offsets() { return {}; } + +void AvroWriter::Register() { + static WriterFactoryRegistry avro_writer_register( + FileFormatType::kAvro, + []() -> Result> { return std::make_unique(); }); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h new file mode 100644 index 000000000..e7fb7c30c --- /dev/null +++ b/src/iceberg/avro/avro_writer.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_writer.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::avro { + +/// \brief A writer for serializing ArrowArray to Avro files. +class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer { + public: + AvroWriter() = default; + + ~AvroWriter() override; + + Status Open(const WriterOptions& options) final; + + Status Close() final; + + Status Write(ArrowArray data) final; + + std::optional metrics() final; + + std::optional length() final; + + std::vector split_offsets() final; + + /// \brief Register this Avro writer implementation. + static void Register(); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::avro diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc new file mode 100644 index 000000000..e5dbea347 --- /dev/null +++ b/src/iceberg/file_writer.cc @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/file_writer.h" + +#include + +#include "iceberg/result.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +WriterFactory GetNotImplementedFactory(FileFormatType format_type) { + return [format_type]() -> Result> { + return NotImplemented("Missing writer factory for file format: {}", format_type); + }; +} + +} // namespace + +WriterFactory& WriterFactoryRegistry::GetFactory(FileFormatType format_type) { + static std::unordered_map factories = { + {FileFormatType::kAvro, GetNotImplementedFactory(FileFormatType::kAvro)}, + {FileFormatType::kParquet, GetNotImplementedFactory(FileFormatType::kParquet)}, + {FileFormatType::kOrc, GetNotImplementedFactory(FileFormatType::kOrc)}, + {FileFormatType::kPuffin, GetNotImplementedFactory(FileFormatType::kPuffin)}, + }; + return factories.at(format_type); +} + +WriterFactoryRegistry::WriterFactoryRegistry(FileFormatType format_type, + WriterFactory factory) { + GetFactory(format_type) = std::move(factory); +} + +Result> WriterFactoryRegistry::Open( + FileFormatType format_type, const WriterOptions& options) { + ICEBERG_ASSIGN_OR_RAISE(auto writer, GetFactory(format_type)()); + ICEBERG_RETURN_UNEXPECTED(writer->Open(options)); + return writer; +} + +} // namespace iceberg diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 6c8e75e34..135151afb 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -28,6 +28,7 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" +#include "iceberg/metrics.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -38,7 +39,7 @@ struct ICEBERG_EXPORT WriterOptions { /// \brief The path to the file to write. std::string path; /// \brief The schema of the data to write. - ArrowSchema schema; + std::shared_ptr schema; /// \brief FileIO instance to open the file. Writer implementations should down cast it /// to the specific FileIO implementation. By default, the `iceberg-bundle` library uses /// `ArrowFileSystemFileIO` as the default implementation. @@ -65,6 +66,20 @@ class ICEBERG_EXPORT Writer { /// /// \return Status of write results. virtual Status Write(ArrowArray data) = 0; + + /// \brief Get the file statistics. + /// Only valid after the file is closed. + virtual std::optional metrics() = 0; + + /// \brief Get the file length. + /// Only valid after the file is closed. + virtual std::optional length() = 0; + + /// \brief Returns a list of recommended split locations, if applicable, empty + /// otherwise. When available, this information is used for planning scan tasks whose + /// boundaries are determined by these offsets. The returned list must be sorted in + /// ascending order. Only valid after the file is closed. + virtual std::vector split_offsets() = 0; }; /// \brief Factory function to create a writer of a specific file format. diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h new file mode 100644 index 000000000..1dfeb20c8 --- /dev/null +++ b/src/iceberg/metrics.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics.h +/// Iceberg file format metrics + +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +/// \brief Iceberg file format metrics +struct ICEBERG_EXPORT Metrics { + int64_t row_count = 0; + std::unordered_map column_sizes; + std::unordered_map value_counts; + std::unordered_map null_value_counts; + std::unordered_map nan_value_counts; + std::unordered_map lower_bounds; + std::unordered_map upper_bounds; +}; + +} // namespace iceberg