From 1b7249c44a9ed18717059c0f94e435d6101d1cca Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 28 May 2025 16:08:58 +0800 Subject: [PATCH 1/2] feat: implement avro file reader --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/arrow/arrow_fs_file_io.h | 3 + src/iceberg/avro/avro_data_util.cc | 32 +++ src/iceberg/avro/avro_data_util_internal.h | 35 ++++ src/iceberg/avro/avro_reader.cc | 222 +++++++++++++++++++++ src/iceberg/avro/avro_reader.h | 47 +++++ src/iceberg/avro/demo_avro.cc | 11 +- src/iceberg/avro/demo_avro.h | 2 + src/iceberg/file_reader.cc | 9 +- src/iceberg/file_reader.h | 36 +++- src/iceberg/result.h | 2 + test/avro_test.cc | 2 +- 12 files changed, 390 insertions(+), 13 deletions(-) create mode 100644 src/iceberg/avro/avro_data_util.cc create mode 100644 src/iceberg/avro/avro_data_util_internal.h create mode 100644 src/iceberg/avro/avro_reader.cc create mode 100644 src/iceberg/avro/avro_reader.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 09b419e7b..4c660d7d4 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -95,6 +95,8 @@ if(ICEBERG_BUILD_BUNDLE) arrow/demo_arrow.cc arrow/arrow_fs_file_io.cc avro/demo_avro.cc + avro/avro_data_util.cc + avro/avro_reader.cc avro/avro_schema_util.cc avro/avro_stream_internal.cc) diff --git a/src/iceberg/arrow/arrow_fs_file_io.h b/src/iceberg/arrow/arrow_fs_file_io.h index b55af5d6a..e187c89d2 100644 --- a/src/iceberg/arrow/arrow_fs_file_io.h +++ b/src/iceberg/arrow/arrow_fs_file_io.h @@ -46,6 +46,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { /// \brief Delete a file at the given location. Status DeleteFile(const std::string& file_location) override; + /// \brief Get the Arrow file system. + const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; } + private: std::shared_ptr<::arrow::fs::FileSystem> arrow_fs_; }; diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc new file mode 100644 index 000000000..48ac7e677 --- /dev/null +++ b/src/iceberg/avro/avro_data_util.cc @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_data_util_internal.h" + +namespace iceberg::avro { + +Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const SchemaProjection& projection, + const Schema& arrow_schema, + ::arrow::ArrayBuilder* array_builder) { + return NotImplemented("AppendDatumToBuilder is not yet implemented"); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_data_util_internal.h b/src/iceberg/avro/avro_data_util_internal.h new file mode 100644 index 000000000..4b96483e7 --- /dev/null +++ b/src/iceberg/avro/avro_data_util_internal.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/schema_util.h" + +namespace iceberg::avro { + +Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const SchemaProjection& projection, + const Schema& arrow_schema, + ::arrow::ArrayBuilder* array_builder); + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc new file mode 100644 index 000000000..5b6632996 --- /dev/null +++ b/src/iceberg/avro/avro_reader.cc @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/avro/avro_reader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io.h" +#include "iceberg/avro/avro_data_util_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/avro/avro_stream_internal.h" +#include "iceberg/schema_internal.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +namespace { + +Result> CreateInputStream(const ReaderOptions& options, + int64_t buffer_size) { + ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); + if (options.length) { + file_info.set_size(options.length.value()); + } + + auto io = internal::checked_pointer_cast(options.io); + auto result = io->fs()->OpenInputFile(file_info); + if (!result.ok()) { + return IOError("Failed to open file {} for {}", options.path, + result.status().message()); + } + + return std::make_unique(result.MoveValueUnsafe(), buffer_size); +} + +} // namespace + +// A stateful context to keep track of the reading progress. +struct ReadContext { + // The datum to reuse for reading the data. + std::unique_ptr<::avro::GenericDatum> datum_; + // The arrow schema to build the record batch. + std::shared_ptr<::arrow::Schema> arrow_schema_; + // The builder to build the record batch. + std::shared_ptr<::arrow::ArrayBuilder> builder_; +}; + +// TODO(gang.wu): there are a lot to do to make this reader work. +// 1. prune the reader schema based on the projection +// 2. read key-value metadata from the avro file +// 3. collect basic reader metrics +class AvroBatchReader::Impl { + public: + Status Open(const ReaderOptions& options) { + batch_size_ = options.batch_size; + read_schema_ = options.projection; + + // Open the input stream and adapt to the avro interface. + // TODO(gangwu): make this configurable + constexpr int64_t kDefaultBufferSize = 1024 * 1024; + ICEBERG_ASSIGN_OR_RAISE(auto input_stream, + CreateInputStream(options, kDefaultBufferSize)); + + // Create a base reader without setting reader schema to enable projection. + auto base_reader = + std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); + const ::avro::ValidSchema& file_schema = base_reader->dataSchema(); + + // Validate field ids in the file schema. + HasIdVisitor has_id_visitor; + ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema)); + if (has_id_visitor.HasNoIds()) { + // TODO(gangwu): support applying field-ids based on name mapping + return NotImplemented("Avro file schema has no field IDs"); + } + if (!has_id_visitor.AllHaveIds()) { + return InvalidSchema("Not all fields in the Avro file schema have field IDs"); + } + + // Project the read schema on top of the file schema. + // TODO(gangwu): support pruning source fields + ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*options.projection, file_schema.root(), + /*prune_source=*/false)); + base_reader->init(file_schema); + reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( + std::move(base_reader)); + + if (options.split) { + reader_->sync(options.split->offset); + split_end_ = options.split->offset + options.split->length; + } + return {}; + } + + Result Next() { + if (!context_) { + ICEBERG_RETURN_UNEXPECTED(InitReadContext()); + } + + while (context_->builder_->length() < batch_size_) { + if (split_end_ && reader_->pastSync(split_end_.value())) { + break; + } + if (!reader_->read(*context_->datum_)) { + break; + } + ICEBERG_RETURN_UNEXPECTED( + AppendDatumToBuilder(reader_->readerSchema().root(), *context_->datum_, + projection_, *read_schema_, context_->builder_.get())); + } + + return ConvertBuilderToArrowArray(); + } + + Status Close() { + if (reader_ != nullptr) { + reader_->close(); + reader_.reset(); + } + context_.reset(); + return {}; + } + + private: + Status InitReadContext() { + context_ = std::make_unique(); + context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema()); + + ArrowSchema arrow_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); + auto import_result = ::arrow::ImportSchema(&arrow_schema); + if (!import_result.ok()) { + return InvalidSchema("Failed to import the arrow schema: {}", + import_result.status().message()); + } + context_->arrow_schema_ = import_result.MoveValueUnsafe(); + + auto arrow_struct_type = + std::make_shared<::arrow::StructType>(context_->arrow_schema_->fields()); + auto builder_result = ::arrow::MakeBuilder(arrow_struct_type); + if (!builder_result.ok()) { + return InvalidSchema("Failed to make the arrow builder: {}", + builder_result.status().message()); + } + context_->builder_ = builder_result.MoveValueUnsafe(); + + return {}; + } + + Result ConvertBuilderToArrowArray() { + if (context_->builder_->length() == 0) { + return {}; + } + + auto builder_result = context_->builder_->Finish(); + if (!builder_result.ok()) { + return InvalidArrowData("Failed to finish the arrow array builder: {}", + builder_result.status().message()); + } + + auto array = builder_result.MoveValueUnsafe(); + ArrowArray arrow_array; + auto export_result = ::arrow::ExportArray(*array, &arrow_array); + if (!export_result.ok()) { + return InvalidArrowData("Failed to export the arrow array: {}", + export_result.message()); + } + return arrow_array; + } + + private: + // Max number of rows in the record batch to read. + int64_t batch_size_{}; + // The end of the split to read and used to terminate the reading. + std::optional split_end_; + // The schema to read. + std::shared_ptr read_schema_; + // The projection result to apply to the read schema. + SchemaProjection projection_; + // The avro reader to read the data into a datum. + std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; + // The context to keep track of the reading progress. + std::unique_ptr context_; +}; + +Result AvroBatchReader::Next() { return impl_->Next(); } + +Status AvroBatchReader::Open(const ReaderOptions& options) { + impl_ = std::make_unique(); + return impl_->Open(options); +} + +Status AvroBatchReader::Close() { return impl_->Close(); } + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_reader.h b/src/iceberg/avro/avro_reader.h new file mode 100644 index 000000000..dc4cf4582 --- /dev/null +++ b/src/iceberg/avro/avro_reader.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "iceberg/file_reader.h" +#include "iceberg/iceberg_bundle_export.h" + +namespace iceberg::avro { + +/// \brief A reader that reads ArrowArray from Avro files. +class ICEBERG_BUNDLE_EXPORT AvroBatchReader : public Reader { + public: + AvroBatchReader() = default; + + ~AvroBatchReader() override = default; + + Status Open(const ReaderOptions& options) final; + + Status Close() final; + + Result Next() final; + + DataLayout data_layout() const final { return DataLayout::kArrowArray; } + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/demo_avro.cc b/src/iceberg/avro/demo_avro.cc index 1b7f7a2e4..679b49664 100644 --- a/src/iceberg/avro/demo_avro.cc +++ b/src/iceberg/avro/demo_avro.cc @@ -55,9 +55,12 @@ Reader::DataLayout DemoAvroReader::data_layout() const { return Reader::DataLayout::kStructLike; } -ICEBERG_REGISTER_READER_FACTORY( - Avro, [](const ReaderOptions& options) -> Result> { - return std::make_unique(); - }); +Status DemoAvroReader::Open(const ReaderOptions& options) { return {}; } + +Status DemoAvroReader::Close() { return {}; } + +ICEBERG_REGISTER_READER_FACTORY(Avro, []() -> Result> { + return std::make_unique(); +}); } // namespace iceberg::avro diff --git a/src/iceberg/avro/demo_avro.h b/src/iceberg/avro/demo_avro.h index ca7dbdf2f..14cb2fa8c 100644 --- a/src/iceberg/avro/demo_avro.h +++ b/src/iceberg/avro/demo_avro.h @@ -38,6 +38,8 @@ class ICEBERG_BUNDLE_EXPORT DemoAvroReader : public Reader { public: DemoAvroReader() = default; ~DemoAvroReader() override = default; + Status Open(const ReaderOptions& options) override; + Status Close() override; Result Next() override; DataLayout data_layout() const override; }; diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc index bf8035a91..a56c30ccc 100644 --- a/src/iceberg/file_reader.cc +++ b/src/iceberg/file_reader.cc @@ -23,13 +23,14 @@ #include "iceberg/expected.h" #include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg { namespace { ReaderFactory GetNotImplementedFactory(FileFormatType format_type) { - return [format_type](const ReaderOptions& options) -> Result> { + return [format_type]() -> Result> { return NotImplemented("Missing reader factory for file format: {}", format_type); }; } @@ -51,9 +52,11 @@ ReaderFactoryRegistry::ReaderFactoryRegistry(FileFormatType format_type, GetFactory(format_type) = std::move(factory); } -Result> ReaderFactoryRegistry::Create( +Result> ReaderFactoryRegistry::Open( FileFormatType format_type, const ReaderOptions& options) { - return GetFactory(format_type)(options); + ICEBERG_ASSIGN_OR_RAISE(auto reader, GetFactory(format_type)()); + ICEBERG_RETURN_UNEXPECTED(reader->Open(options)); + return reader; } StructLikeReader::StructLikeReader(std::unique_ptr reader) diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index c3faab5b8..ca46cdccd 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -38,6 +38,15 @@ namespace iceberg { class ICEBERG_EXPORT Reader { public: virtual ~Reader() = default; + Reader() = default; + Reader(const Reader&) = delete; + Reader& operator=(const Reader&) = delete; + + /// \brief Open the reader. + virtual Status Open(const struct ReaderOptions& options) = 0; + + /// \brief Close the reader. + virtual Status Close() = 0; /// \brief Read next data from file. /// @@ -61,11 +70,19 @@ class ICEBERG_EXPORT StructLikeReader : public Reader { public: explicit StructLikeReader(std::unique_ptr reader); + ~StructLikeReader() override = default; + /// \brief Always read data into `StructLike` or monostate if no more data. Result Next() final; DataLayout data_layout() const final { return DataLayout::kStructLike; } + Status Open(const struct ReaderOptions& options) final { + return reader_->Open(options); + } + + Status Close() final { return reader_->Close(); } + private: std::unique_ptr reader_; }; @@ -78,11 +95,19 @@ class ICEBERG_EXPORT BatchReader : public Reader { public: explicit BatchReader(std::unique_ptr reader); + ~BatchReader() override = default; + /// \brief Always read data into `ArrowArray` or monostate if no more data. Result Next() final; DataLayout data_layout() const final { return DataLayout::kArrowArray; } + Status Open(const struct ReaderOptions& options) final { + return reader_->Open(options); + } + + Status Close() final { return reader_->Close(); } + private: std::unique_ptr reader_; }; @@ -115,11 +140,12 @@ struct ICEBERG_EXPORT ReaderOptions { /// \brief The filter to apply to the data. Reader implementations may ignore this if /// the file format does not support filtering. std::shared_ptr filter; + /// \brief Format-specific or implementation-specific properties. + std::unordered_map properties; }; /// \brief Factory function to create a reader of a specific file format. -using ReaderFactory = - std::function>(const ReaderOptions&)>; +using ReaderFactory = std::function>()>; /// \brief Registry of reader factories for different file formats. struct ICEBERG_EXPORT ReaderFactoryRegistry { @@ -129,9 +155,9 @@ struct ICEBERG_EXPORT ReaderFactoryRegistry { /// \brief Get the factory function for a specific file format. static ReaderFactory& GetFactory(FileFormatType format_type); - /// \brief Create a reader for a specific file format. - static Result> Create(FileFormatType format_type, - const ReaderOptions& options); + /// \brief Open a reader for a specific file format. + static Result> Open(FileFormatType format_type, + const ReaderOptions& options); }; /// \brief Macro to register a reader factory for a specific file format. diff --git a/src/iceberg/result.h b/src/iceberg/result.h index ecfcec44c..e9f1dfbfd 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -33,6 +33,7 @@ enum class ErrorKind { kCommitStateUnknown, kDecompressError, kInvalidArgument, + kInvalidArrowData, kInvalidExpression, kInvalidSchema, kIOError, @@ -77,6 +78,7 @@ DEFINE_ERROR_FUNCTION(AlreadyExists) DEFINE_ERROR_FUNCTION(CommitStateUnknown) DEFINE_ERROR_FUNCTION(DecompressError) DEFINE_ERROR_FUNCTION(InvalidArgument) +DEFINE_ERROR_FUNCTION(InvalidArrowData) DEFINE_ERROR_FUNCTION(InvalidExpression) DEFINE_ERROR_FUNCTION(InvalidSchema) DEFINE_ERROR_FUNCTION(IOError) diff --git a/test/avro_test.cc b/test/avro_test.cc index 5ffcbc01a..01ac82113 100644 --- a/test/avro_test.cc +++ b/test/avro_test.cc @@ -45,7 +45,7 @@ TEST(AVROTest, TestDemoAvro) { } TEST(AVROTest, TestDemoAvroReader) { - auto result = ReaderFactoryRegistry::Create(FileFormatType::kAvro, {}); + auto result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, {}); ASSERT_THAT(result, IsOk()); auto reader = std::move(result.value()); From c1a6474b5ee27a8f82aec8830393cc3d5b2e9494 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 30 May 2025 16:35:45 +0800 Subject: [PATCH 2/2] fix windows build --- src/iceberg/expected.h | 19 +++++++++---------- src/iceberg/schema_internal.h | 3 ++- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/iceberg/expected.h b/src/iceberg/expected.h index 9d28d3815..723b28259 100644 --- a/src/iceberg/expected.h +++ b/src/iceberg/expected.h @@ -122,7 +122,7 @@ inline constexpr bool is_error_type_valid_v = is_error_type_valid::value; } // namespace expected_detail template -class ICEBERG_EXPORT [[nodiscard]] unexpected { +class [[nodiscard]] unexpected { public: static_assert(expected_detail::is_error_type_valid_v); @@ -192,7 +192,7 @@ class ICEBERG_EXPORT [[nodiscard]] unexpected { template unexpected(E) -> unexpected; -struct ICEBERG_EXPORT unexpect_t { +struct unexpect_t { explicit unexpect_t() = default; }; inline constexpr unexpect_t unexpect{}; @@ -201,7 +201,7 @@ template class bad_expected_access; template <> -class ICEBERG_EXPORT bad_expected_access : public std::exception { +class bad_expected_access : public std::exception { public: const char* what() const noexcept override { return "Bad expected access"; } @@ -214,7 +214,7 @@ class ICEBERG_EXPORT bad_expected_access : public std::exception { }; template -class ICEBERG_EXPORT bad_expected_access : public bad_expected_access { +class bad_expected_access : public bad_expected_access { public: explicit bad_expected_access(E e) noexcept(std::is_nothrow_move_constructible_v) : m_val(std::move(e)) {} @@ -1006,9 +1006,8 @@ struct default_ctor_base { /// tracked by the expected object. template -class ICEBERG_EXPORT [[nodiscard]] expected - : private expected_detail::move_assign_base, - private expected_detail::default_ctor_base { +class [[nodiscard]] expected : private expected_detail::move_assign_base, + private expected_detail::default_ctor_base { static_assert(expected_detail::is_value_type_valid_v); static_assert(expected_detail::is_error_type_valid_v); @@ -1798,7 +1797,7 @@ constexpr void swap(expected& lhs, } template -class ICEBERG_EXPORT [[nodiscard]] expected +class [[nodiscard]] expected : private expected_detail::move_assign_base, private expected_detail::default_ctor_base { static_assert(expected_detail::is_error_type_valid_v); @@ -2349,8 +2348,8 @@ class ICEBERG_EXPORT [[nodiscard]] expected // standalone swap for void value type template && std::is_swappable_v>* = nullptr> -ICEBERG_EXPORT constexpr void swap( - expected& lhs, expected& rhs) noexcept(noexcept(lhs.swap(rhs))) { +constexpr void swap(expected& lhs, + expected& rhs) noexcept(noexcept(lhs.swap(rhs))) { lhs.swap(rhs); } diff --git a/src/iceberg/schema_internal.h b/src/iceberg/schema_internal.h index 26be1c3f1..1a31e68f2 100644 --- a/src/iceberg/schema_internal.h +++ b/src/iceberg/schema_internal.h @@ -24,6 +24,7 @@ #include +#include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -39,7 +40,7 @@ constexpr std::string_view kFieldIdKey = "ICEBERG:field_id"; /// \param[in] schema The Iceberg schema to convert. /// \param[out] out The Arrow schema to convert to. /// \return An error if the conversion fails. -Status ToArrowSchema(const Schema& schema, ArrowSchema* out); +ICEBERG_EXPORT Status ToArrowSchema(const Schema& schema, ArrowSchema* out); /// \brief Convert an Arrow schema to an Iceberg schema. ///