From 189fb56e924d239b5139e0758867eab64285f69a Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Thu, 14 Aug 2025 18:00:17 +0800 Subject: [PATCH 1/6] feat: add manifest&manifest list writer 1 add v1v2v3 writer definition 2 --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/manifest_writer.cc | 90 ++++++++++++++++ src/iceberg/manifest_writer.h | 32 +++++- src/iceberg/manifest_writer_internal.cc | 69 ++++++++++++ src/iceberg/manifest_writer_internal.h | 136 ++++++++++++++++++++++++ 5 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 src/iceberg/manifest_writer.cc create mode 100644 src/iceberg/manifest_writer_internal.cc create mode 100644 src/iceberg/manifest_writer_internal.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 587b1596a..82f60729a 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -47,6 +47,8 @@ set(ICEBERG_SOURCES type.cc manifest_reader.cc manifest_reader_internal.cc + manifest_writer.cc + manifest_writer_internal.cc arrow_c_data_guard_internal.cc util/murmurhash3_internal.cc util/timepoint.cc diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc new file mode 100644 index 000000000..13b9ed5ef --- /dev/null +++ b/src/iceberg/manifest_writer.cc @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest_writer.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/manifest_writer_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ManifestWriter::MakeWriter( + int32_t format_version, int64_t first_row_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema) { + auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto fields_span = manifest_entry_schema->fields(); + std::vector fields(fields_span.begin(), fields_span.end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro, + {.path = std::string(manifest_location), + .schema = schema, + .io = std::move(file_io)})); + switch (format_version) { + case 1: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + case 2: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + case 3: + return std::make_unique(first_row_id, std::move(writer), + std::move(schema)); + + default: + return InvalidArgument("Unsupported manifest format version: {}", format_version); + } +} + +Result> ManifestListWriter::MakeWriter( + int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::string_view manifest_list_location, std::shared_ptr file_io) { + std::vector fields(ManifestFile::Type().fields().begin(), + ManifestFile::Type().fields().end()); + auto schema = std::make_shared(fields); + ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = std::string(manifest_list_location), + .schema = schema, + .io = std::move(file_io)})); + switch (format_version) { + case 1: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + case 2: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + case 3: + return std::make_unique(snapshot_id, parent_snapshot_id, + sequence_number, first_row_id, + std::move(writer), std::move(schema)); + + default: + return InvalidArgument("Unsupported manifest list format version: {}", + format_version); + } +} + +} // namespace iceberg diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 3c5091b3d..517a7dacf 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -35,29 +35,51 @@ namespace iceberg { class ICEBERG_EXPORT ManifestWriter { public: virtual ~ManifestWriter() = default; - virtual Status WriteManifestEntries( - const std::vector& entries) const = 0; + + /// \brief Write manifest entry to file + /// \param entry Manifest entry to write. + /// \return Status::OK() if all entry was written successfully + virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0; + + /// \brief Close writer and flush to storage. + virtual Status Close() = 0; /// \brief Creates a writer for a manifest file. + /// \param format_version Format version of the manifest. + /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( - std::string_view manifest_location, std::shared_ptr file_io, - std::shared_ptr partition_schema); + int32_t format_version, int64_t first_row_id, std::string_view manifest_location, + std::shared_ptr file_io, std::shared_ptr partition_schema); }; /// \brief Write manifest files to a manifest list file. class ICEBERG_EXPORT ManifestListWriter { public: virtual ~ManifestListWriter() = default; - virtual Status WriteManifestFiles(const std::vector& files) const = 0; + + /// \brief Write manifest file list to manifest list file. + /// \param file Manifest file to write. + /// \return Status::OK() if all file was written successfully + virtual Status WriteManifestFile(const ManifestFile& file) const = 0; + + /// \brief Close writer and flush to storage. + virtual Status Close() = 0; /// \brief Creates a writer for the manifest list. + /// \param format_version Format version of the manifest list. + /// \param snapshot_id ID of the snapshot. + /// \param parent_snapshot_id ID of the parent snapshot. + /// \param sequence_number Sequence number of the snapshot. + /// \param first_row_id First row ID of the snapshot. /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( + int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc new file mode 100644 index 000000000..a6faa35fa --- /dev/null +++ b/src/iceberg/manifest_writer_internal.cc @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "manifest_writer_internal.h" + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" +#include "iceberg/schema.h" + +namespace iceberg { + +Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV1::Close() { return {}; } + +Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV2::Close() { return {}; } + +Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV3::Close() { return {}; } + +Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV1::Close() { return {}; } + +Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV2::Close() { return {}; } + +Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV3::Close() { return {}; } +} // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h new file mode 100644 index 000000000..9efca8c31 --- /dev/null +++ b/src/iceberg/manifest_writer_internal.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/internal/manifest_writer_internal.h +/// Writer implementation for manifest list files and manifest files. + +#include "iceberg/manifest_writer.h" + +namespace iceberg { + +/// \brief Write manifest entries to a manifest file. +class ManifestWriterImpl : public ManifestWriter { + public: + explicit ManifestWriterImpl(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +/// \brief Write v1 manifest entries to a manifest file. +class ManifestWriterV1 : public ManifestWriterImpl { + public: + explicit ManifestWriterV1(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write v2 manifest entries to a manifest file. +class ManifestWriterV2 : public ManifestWriterImpl { + public: + explicit ManifestWriterV2(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write v3 manifest entries to a manifest file. +class ManifestWriterV3 : public ManifestWriterImpl { + public: + explicit ManifestWriterV3(int64_t first_row_id, std::unique_ptr writer, + std::shared_ptr schema) + : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestEntry(const ManifestEntry& entry) const override; + + Status Close() override; +}; + +/// \brief Write manifest files to a manifest list file. +class ManifestListWriterImpl : public ManifestListWriter { + public: + explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : schema_(std::move(schema)), writer_(std::move(writer)) {} + + private: + std::shared_ptr schema_; + std::unique_ptr writer_; +}; + +/// \brief Write v1 manifest files to a manifest list file. +class ManifestListWriterV1 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +/// \brief Write v2 manifest files to a manifest list file. +class ManifestListWriterV2 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +/// \brief Write v3 manifest files to a manifest list file. +class ManifestListWriterV3 : public ManifestListWriterImpl { + public: + explicit ManifestListWriterV3(int64_t snapshot_id, int64_t parent_snapshot_id, + int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, + std::shared_ptr schema) + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, + first_row_id, std::move(writer), std::move(schema)) {} + + Status WriteManifestFile(const ManifestFile& file) const override; + + Status Close() override; +}; + +} // namespace iceberg From 156425859c55ef20a58b4e747fcfc1b5c4fa8a78 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 10:09:18 +0800 Subject: [PATCH 2/6] add v1v2v3 metadata wrapper --- src/iceberg/manifest_writer.cc | 19 +++++---- src/iceberg/manifest_writer.h | 6 ++- src/iceberg/manifest_writer_internal.h | 58 ++++++++++++++++++-------- src/iceberg/v1_metadata.h | 53 +++++++++++++++++++++++ src/iceberg/v2_metadata.h | 58 ++++++++++++++++++++++++++ src/iceberg/v3_metadata.h | 58 ++++++++++++++++++++++++++ 6 files changed, 224 insertions(+), 28 deletions(-) create mode 100644 src/iceberg/v1_metadata.h create mode 100644 src/iceberg/v2_metadata.h create mode 100644 src/iceberg/v3_metadata.h diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 13b9ed5ef..866db4a3e 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -28,8 +28,9 @@ namespace iceberg { Result> ManifestWriter::MakeWriter( - int32_t format_version, int64_t first_row_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema) { + int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); @@ -41,14 +42,14 @@ Result> ManifestWriter::MakeWriter( .io = std::move(file_io)})); switch (format_version) { case 1: - return std::make_unique(first_row_id, std::move(writer), + return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 2: - return std::make_unique(first_row_id, std::move(writer), + return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 3: - return std::make_unique(first_row_id, std::move(writer), - std::move(schema)); + return std::make_unique(snapshot_id, first_row_id, + std::move(writer), std::move(schema)); default: return InvalidArgument("Unsupported manifest format version: {}", format_version); @@ -70,12 +71,12 @@ Result> ManifestListWriter::MakeWriter( switch (format_version) { case 1: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, + std::move(writer), std::move(schema)); case 2: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, - std::move(writer), std::move(schema)); + sequence_number, std::move(writer), + std::move(schema)); case 3: return std::make_unique(snapshot_id, parent_snapshot_id, sequence_number, first_row_id, diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 517a7dacf..5930243db 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -46,13 +46,15 @@ class ICEBERG_EXPORT ManifestWriter { /// \brief Creates a writer for a manifest file. /// \param format_version Format version of the manifest. + /// \param snapshot_id ID of the snapshot. /// \param first_row_id First row ID of the snapshot. /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. static Result> MakeWriter( - int32_t format_version, int64_t first_row_id, std::string_view manifest_location, - std::shared_ptr file_io, std::shared_ptr partition_schema); + int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + std::string_view manifest_location, std::shared_ptr file_io, + std::shared_ptr partition_schema); }; /// \brief Write manifest files to a manifest list file. diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h index 9efca8c31..6acbe61e2 100644 --- a/src/iceberg/manifest_writer_internal.h +++ b/src/iceberg/manifest_writer_internal.h @@ -23,13 +23,16 @@ /// Writer implementation for manifest list files and manifest files. #include "iceberg/manifest_writer.h" +#include "iceberg/v1_metadata.h" +#include "iceberg/v2_metadata.h" +#include "iceberg/v3_metadata.h" namespace iceberg { /// \brief Write manifest entries to a manifest file. class ManifestWriterImpl : public ManifestWriter { public: - explicit ManifestWriterImpl(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterImpl(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} @@ -41,44 +44,55 @@ class ManifestWriterImpl : public ManifestWriter { /// \brief Write v1 manifest entries to a manifest file. class ManifestWriterV1 : public ManifestWriterImpl { public: - explicit ManifestWriterV1(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV1(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V1MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write v2 manifest entries to a manifest file. class ManifestWriterV2 : public ManifestWriterImpl { public: - explicit ManifestWriterV2(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV2(int64_t snapshot_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), + wrapper_(snapshot_id) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V2MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write v3 manifest entries to a manifest file. class ManifestWriterV3 : public ManifestWriterImpl { public: - explicit ManifestWriterV3(int64_t first_row_id, std::unique_ptr writer, + explicit ManifestWriterV3(int64_t snapshot_id, int64_t first_row_id, + std::unique_ptr writer, std::shared_ptr schema) - : ManifestWriterImpl(first_row_id, std::move(writer), std::move(schema)) {} + : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), + wrapper_(snapshot_id) {} Status WriteManifestEntry(const ManifestEntry& entry) const override; Status Close() override; + + private: + V3MetaData::ManifestEntryWrapper wrapper_; }; /// \brief Write manifest files to a manifest list file. class ManifestListWriterImpl : public ManifestListWriter { public: explicit ManifestListWriterImpl(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, std::unique_ptr writer, std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} @@ -92,30 +106,36 @@ class ManifestListWriterImpl : public ManifestListWriter { class ManifestListWriterV1 : public ManifestListWriterImpl { public: explicit ManifestListWriterV1(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V1MetaData::ManifestFileWrapper wrapper_; }; /// \brief Write v2 manifest files to a manifest list file. class ManifestListWriterV2 : public ManifestListWriterImpl { public: explicit ManifestListWriterV2(int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, - std::unique_ptr writer, + int64_t sequence_number, std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)), + wrapper_(snapshot_id, sequence_number) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V2MetaData::ManifestFileWrapper wrapper_; }; /// \brief Write v3 manifest files to a manifest list file. @@ -125,12 +145,16 @@ class ManifestListWriterV3 : public ManifestListWriterImpl { int64_t sequence_number, int64_t first_row_id, std::unique_ptr writer, std::shared_ptr schema) - : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, sequence_number, - first_row_id, std::move(writer), std::move(schema)) {} + : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), + std::move(schema)), + wrapper_(snapshot_id, sequence_number) {} Status WriteManifestFile(const ManifestFile& file) const override; Status Close() override; + + private: + V3MetaData::ManifestFileWrapper wrapper_; }; } // namespace iceberg diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h new file mode 100644 index 000000000..6fb47a37f --- /dev/null +++ b/src/iceberg/v1_metadata.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v1_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v1 metadata wrapper. +/// +/// Wrapper for v1 manifest list and manifest entry. +class V1MetaData { + public: + /// \brief v1 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper() {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v1 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper() {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper() { return ManifestFileWrapper(); } + + static ManifestEntryWrapper manifestEntryWrapper() { return ManifestEntryWrapper(); } +}; + +} // namespace iceberg diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h new file mode 100644 index 000000000..7de4c06ce --- /dev/null +++ b/src/iceberg/v2_metadata.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v2_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v2 metadata wrapper. +/// +/// Wrapper for v2 manifest list and manifest entry. +class V2MetaData { + public: + /// \brief v2 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v2 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, + int64_t sequence_number) { + return ManifestFileWrapper(commit_snapshotId, sequence_number); + } + + static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { + return ManifestEntryWrapper(commit_snapshot_id); + } +}; + +} // namespace iceberg diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h new file mode 100644 index 000000000..efbceda4a --- /dev/null +++ b/src/iceberg/v3_metadata.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/v3_metadata.h + +#include "iceberg/manifest_entry.h" +#include "iceberg/manifest_list.h" + +namespace iceberg { + +/// \brief v3 metadata wrapper. +/// +/// Wrapper for v3 manifest list and manifest entry. +class V3MetaData { + public: + /// \brief v3 manifest file wrapper. + struct ManifestFileWrapper : public ManifestFile { + explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} + + ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + }; + + /// \brief v3 manifest entry wrapper. + struct ManifestEntryWrapper : public ManifestEntry { + explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} + + ManifestEntry wrap(ManifestEntry entry) { return *this; } + }; + + static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, + int64_t sequence_number) { + return ManifestFileWrapper(commit_snapshotId, sequence_number); + } + + static ManifestEntryWrapper manifestEntryWrapper(int64_t commit_snapshot_id) { + return ManifestEntryWrapper(commit_snapshot_id); + } +}; + +} // namespace iceberg From 8fabe7c0386ffbda0aa938db0f8f6bc8ab595f08 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 11:17:22 +0800 Subject: [PATCH 3/6] fix cpplint --- src/iceberg/v1_metadata.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 6fb47a37f..79e410316 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -33,14 +33,14 @@ class V1MetaData { public: /// \brief v1 manifest file wrapper. struct ManifestFileWrapper : public ManifestFile { - explicit ManifestFileWrapper() {} + ManifestFileWrapper() = default; ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } }; /// \brief v1 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { - explicit ManifestEntryWrapper() {} + ManifestEntryWrapper() = default; ManifestEntry wrap(ManifestEntry entry) { return *this; } }; From cf5f4e53494abddccf0f0b3b507c1d91fa74404e Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Fri, 15 Aug 2025 11:35:59 +0800 Subject: [PATCH 4/6] fix cpplint --- src/iceberg/v1_metadata.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 79e410316..8f2ea07ea 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -45,9 +45,9 @@ class V1MetaData { ManifestEntry wrap(ManifestEntry entry) { return *this; } }; - static ManifestFileWrapper manifestFileWrapper() { return ManifestFileWrapper(); } + static ManifestFileWrapper manifestFileWrapper() { return {}; } - static ManifestEntryWrapper manifestEntryWrapper() { return ManifestEntryWrapper(); } + static ManifestEntryWrapper manifestEntryWrapper() { return {}; } }; } // namespace iceberg From 92716995540e19cda672ea465c4a25c2e1cd1254 Mon Sep 17 00:00:00 2001 From: "xiao.dong" Date: Mon, 18 Aug 2025 14:31:40 +0800 Subject: [PATCH 5/6] fix comment --- src/iceberg/manifest_reader.cc | 4 +- src/iceberg/manifest_reader.h | 4 +- src/iceberg/manifest_writer.cc | 40 ++++++++------ src/iceberg/manifest_writer.h | 30 +++++++---- src/iceberg/manifest_writer_internal.cc | 72 ++++++++++++++++++++++--- src/iceberg/manifest_writer_internal.h | 45 +++++++++++++--- src/iceberg/table_scan.cc | 8 +-- src/iceberg/v1_metadata.h | 4 +- src/iceberg/v2_metadata.h | 4 +- src/iceberg/v3_metadata.h | 6 ++- test/manifest_list_reader_test.cc | 2 +- test/manifest_reader_test.cc | 3 +- 12 files changed, 166 insertions(+), 56 deletions(-) diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc index 208263d57..29e0ed923 100644 --- a/src/iceberg/manifest_reader.cc +++ b/src/iceberg/manifest_reader.cc @@ -27,7 +27,7 @@ namespace iceberg { -Result> ManifestReader::MakeReader( +Result> ManifestReader::Make( std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); @@ -42,7 +42,7 @@ Result> ManifestReader::MakeReader( return std::make_unique(std::move(reader), std::move(schema)); } -Result> ManifestListReader::MakeReader( +Result> ManifestListReader::Make( std::string_view manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h index 5d231de0f..832d7773c 100644 --- a/src/iceberg/manifest_reader.h +++ b/src/iceberg/manifest_reader.h @@ -41,7 +41,7 @@ class ICEBERG_EXPORT ManifestReader { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the reader or an error. - static Result> MakeReader( + static Result> Make( std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -56,7 +56,7 @@ class ICEBERG_EXPORT ManifestListReader { /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the reader or an error. - static Result> MakeReader( + static Result> Make( std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc index 866db4a3e..ac412aea2 100644 --- a/src/iceberg/manifest_writer.cc +++ b/src/iceberg/manifest_writer.cc @@ -27,11 +27,12 @@ namespace iceberg { -Result> ManifestWriter::MakeWriter( - int32_t format_version, int64_t snapshot_id, int64_t first_row_id, +Result> ManifestWriter::Make( + int32_t format_version, int64_t snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema) { - auto manifest_entry_schema = ManifestEntry::TypeFromPartitionType(partition_schema); + auto manifest_entry_schema = + ManifestEntry::TypeFromPartitionType(std::move(partition_schema)); auto fields_span = manifest_entry_schema->fields(); std::vector fields(fields_span.begin(), fields_span.end()); auto schema = std::make_shared(fields); @@ -48,17 +49,21 @@ Result> ManifestWriter::MakeWriter( return std::make_unique(snapshot_id, std::move(writer), std::move(schema)); case 3: - return std::make_unique(snapshot_id, first_row_id, + // first_row_id is required for V3 manifest entry + if (!first_row_id.has_value()) { + return InvalidManifest("first_row_id is required for V3 manifest entry"); + } + return std::make_unique(snapshot_id, first_row_id.value(), std::move(writer), std::move(schema)); default: - return InvalidArgument("Unsupported manifest format version: {}", format_version); + return NotSupported("Unsupported manifest format version: {}", format_version); } } -Result> ManifestListWriter::MakeWriter( +Result> ManifestListWriter::Make( int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::optional sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io) { std::vector fields(ManifestFile::Type().fields().begin(), ManifestFile::Type().fields().end()); @@ -71,20 +76,25 @@ Result> ManifestListWriter::MakeWriter( switch (format_version) { case 1: return std::make_unique(snapshot_id, parent_snapshot_id, - std::move(writer), std::move(schema)); case 2: return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, std::move(writer), - std::move(schema)); - case 3: - return std::make_unique(snapshot_id, parent_snapshot_id, - sequence_number, first_row_id, + sequence_number.value(), std::move(writer), std::move(schema)); + case 3: + // sequence_number&first_row_id is required for V3 manifest list + if (!sequence_number.has_value()) { + return InvalidManifestList("sequence_number is required for V3 manifest list"); + } + if (!first_row_id.has_value()) { + return InvalidManifestList("first_row_id is required for V3 manifest list"); + } + return std::make_unique( + snapshot_id, parent_snapshot_id, sequence_number.value(), first_row_id.value(), + std::move(writer), std::move(schema)); default: - return InvalidArgument("Unsupported manifest list format version: {}", - format_version); + return NotSupported("Unsupported manifest list format version: {}", format_version); } } diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h index 5930243db..6ddc17f1f 100644 --- a/src/iceberg/manifest_writer.h +++ b/src/iceberg/manifest_writer.h @@ -36,10 +36,15 @@ class ICEBERG_EXPORT ManifestWriter { public: virtual ~ManifestWriter() = default; - /// \brief Write manifest entry to file + /// \brief Write manifest entry to file. /// \param entry Manifest entry to write. - /// \return Status::OK() if all entry was written successfully - virtual Status WriteManifestEntry(const ManifestEntry& entry) const = 0; + /// \return Status::OK() if entry was written successfully + virtual Status Add(const ManifestEntry& entry) = 0; + + /// \brief Write manifest entries to file. + /// \param entries Manifest entries to write. + /// \return Status::OK() if all entries were written successfully + virtual Status AddAll(const std::vector& entries) = 0; /// \brief Close writer and flush to storage. virtual Status Close() = 0; @@ -51,8 +56,8 @@ class ICEBERG_EXPORT ManifestWriter { /// \param manifest_location Path to the manifest file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( - int32_t format_version, int64_t snapshot_id, int64_t first_row_id, + static Result> Make( + int32_t format_version, int64_t snapshot_id, std::optional first_row_id, std::string_view manifest_location, std::shared_ptr file_io, std::shared_ptr partition_schema); }; @@ -62,10 +67,15 @@ class ICEBERG_EXPORT ManifestListWriter { public: virtual ~ManifestListWriter() = default; - /// \brief Write manifest file list to manifest list file. + /// \brief Write manifest file to manifest list file. /// \param file Manifest file to write. - /// \return Status::OK() if all file was written successfully - virtual Status WriteManifestFile(const ManifestFile& file) const = 0; + /// \return Status::OK() if file was written successfully + virtual Status Add(const ManifestFile& file) = 0; + + /// \brief Write manifest file list to manifest list file. + /// \param files Manifest file list to write. + /// \return Status::OK() if all files were written successfully + virtual Status AddAll(const std::vector& files) = 0; /// \brief Close writer and flush to storage. virtual Status Close() = 0; @@ -79,9 +89,9 @@ class ICEBERG_EXPORT ManifestListWriter { /// \param manifest_list_location Path to the manifest list file. /// \param file_io File IO implementation to use. /// \return A Result containing the writer or an error. - static Result> MakeWriter( + static Result> Make( int32_t format_version, int64_t snapshot_id, int64_t parent_snapshot_id, - int64_t sequence_number, int64_t first_row_id, + std::optional sequence_number, std::optional first_row_id, std::string_view manifest_list_location, std::shared_ptr file_io); }; diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc index a6faa35fa..d5b30d8af 100644 --- a/src/iceberg/manifest_writer_internal.cc +++ b/src/iceberg/manifest_writer_internal.cc @@ -25,45 +25,105 @@ namespace iceberg { -Status ManifestWriterV1::WriteManifestEntry(const ManifestEntry& entry) const { +Status ManifestWriterV1::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV1::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV1::Close() { return {}; } -Status ManifestWriterV2::WriteManifestEntry(const ManifestEntry& entry) const { +ManifestEntry ManifestWriterV1::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestWriterV2::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV2::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV2::Close() { return {}; } -Status ManifestWriterV3::WriteManifestEntry(const ManifestEntry& entry) const { +ManifestEntry ManifestWriterV2::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestWriterV3::Add(const ManifestEntry& entry) { + // TODO(xiao.dong) convert entries to arrow data + return {}; +} + +Status ManifestWriterV3::AddAll(const std::vector& files) { // TODO(xiao.dong) convert entries to arrow data return {}; } Status ManifestWriterV3::Close() { return {}; } -Status ManifestListWriterV1::WriteManifestFile(const ManifestFile& file) const { +ManifestEntry ManifestWriterV3::prepare(const ManifestEntry& entry) { + return wrapper_.Wrap(entry); +} + +Status ManifestListWriterV1::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV1::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV1::Close() { return {}; } -Status ManifestListWriterV2::WriteManifestFile(const ManifestFile& file) const { +ManifestFile ManifestListWriterV1::prepare(const ManifestFile& file) { + return wrapper_.Wrap(file); +} + +Status ManifestListWriterV2::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV2::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV2::Close() { return {}; } -Status ManifestListWriterV3::WriteManifestFile(const ManifestFile& file) const { +ManifestFile ManifestListWriterV2::prepare(const ManifestFile& file) { + return wrapper_.Wrap(file); +} + +Status ManifestListWriterV3::Add(const ManifestFile& file) { + // TODO(xiao.dong) convert manifest files to arrow data + return {}; +} + +Status ManifestListWriterV3::AddAll(const std::vector& files) { // TODO(xiao.dong) convert manifest files to arrow data return {}; } Status ManifestListWriterV3::Close() { return {}; } + +ManifestFile ManifestListWriterV3::prepare(const ManifestFile& file) { + if (file.content != ManifestFile::Content::kData || file.first_row_id.has_value()) { + return wrapper_.Wrap(file, std::nullopt); + } + auto result = wrapper_.Wrap(file, next_row_id_); + next_row_id_ += + file.existing_rows_count.value_or(0) + file.added_rows_count.value_or(0); + return result; +} } // namespace iceberg diff --git a/src/iceberg/manifest_writer_internal.h b/src/iceberg/manifest_writer_internal.h index 6acbe61e2..0ae711942 100644 --- a/src/iceberg/manifest_writer_internal.h +++ b/src/iceberg/manifest_writer_internal.h @@ -36,6 +36,9 @@ class ManifestWriterImpl : public ManifestWriter { std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} + protected: + virtual ManifestEntry prepare(const ManifestEntry& entry) = 0; + private: std::shared_ptr schema_; std::unique_ptr writer_; @@ -48,10 +51,13 @@ class ManifestWriterV1 : public ManifestWriterImpl { std::shared_ptr schema) : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; - + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V1MetaData::ManifestEntryWrapper wrapper_; }; @@ -64,10 +70,14 @@ class ManifestWriterV2 : public ManifestWriterImpl { : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), wrapper_(snapshot_id) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V2MetaData::ManifestEntryWrapper wrapper_; }; @@ -81,10 +91,14 @@ class ManifestWriterV3 : public ManifestWriterImpl { : ManifestWriterImpl(snapshot_id, std::move(writer), std::move(schema)), wrapper_(snapshot_id) {} - Status WriteManifestEntry(const ManifestEntry& entry) const override; + Status Add(const ManifestEntry& entry) override; + Status AddAll(const std::vector& entries) override; Status Close() override; + protected: + ManifestEntry prepare(const ManifestEntry& entry) override; + private: V3MetaData::ManifestEntryWrapper wrapper_; }; @@ -97,6 +111,9 @@ class ManifestListWriterImpl : public ManifestListWriter { std::shared_ptr schema) : schema_(std::move(schema)), writer_(std::move(writer)) {} + protected: + virtual ManifestFile prepare(const ManifestFile& file) = 0; + private: std::shared_ptr schema_; std::unique_ptr writer_; @@ -112,10 +129,13 @@ class ManifestListWriterV1 : public ManifestListWriterImpl { : ManifestListWriterImpl(snapshot_id, parent_snapshot_id, std::move(writer), std::move(schema)) {} - Status WriteManifestFile(const ManifestFile& file) const override; - + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: V1MetaData::ManifestFileWrapper wrapper_; }; @@ -130,10 +150,14 @@ class ManifestListWriterV2 : public ManifestListWriterImpl { std::move(schema)), wrapper_(snapshot_id, sequence_number) {} - Status WriteManifestFile(const ManifestFile& file) const override; + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: V2MetaData::ManifestFileWrapper wrapper_; }; @@ -149,11 +173,16 @@ class ManifestListWriterV3 : public ManifestListWriterImpl { std::move(schema)), wrapper_(snapshot_id, sequence_number) {} - Status WriteManifestFile(const ManifestFile& file) const override; + Status Add(const ManifestFile& file) override; + Status AddAll(const std::vector& files) override; Status Close() override; + protected: + ManifestFile prepare(const ManifestFile& file) override; + private: + int64_t next_row_id_ = 0; V3MetaData::ManifestFileWrapper wrapper_; }; diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 43deb2aca..220ef1ef7 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -147,7 +147,7 @@ DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr f Result>> DataTableScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE( auto manifest_list_reader, - ManifestListReader::MakeReader(context_.snapshot->manifest_list, file_io_)); + ManifestListReader::Make(context_.snapshot->manifest_list, file_io_)); ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files()); std::vector> tasks; @@ -155,9 +155,9 @@ Result>> DataTableScan::PlanFiles() co auto partition_schema = partition_spec->schema(); for (const auto& manifest_file : manifest_files) { - ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader, - ManifestReader::MakeReader(manifest_file.manifest_path, - file_io_, partition_schema)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_reader, + ManifestReader::Make(manifest_file.manifest_path, file_io_, partition_schema)); ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries()); // TODO(gty404): filter manifests using partition spec and filter expression diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 8f2ea07ea..4ec9a12fc 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -35,14 +35,14 @@ class V1MetaData { struct ManifestFileWrapper : public ManifestFile { ManifestFileWrapper() = default; - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file) { return *this; } }; /// \brief v1 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { ManifestEntryWrapper() = default; - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper() { return {}; } diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h index 7de4c06ce..e264e6eef 100644 --- a/src/iceberg/v2_metadata.h +++ b/src/iceberg/v2_metadata.h @@ -35,14 +35,14 @@ class V2MetaData { struct ManifestFileWrapper : public ManifestFile { explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file) { return *this; } }; /// \brief v2 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h index efbceda4a..c5d257c92 100644 --- a/src/iceberg/v3_metadata.h +++ b/src/iceberg/v3_metadata.h @@ -35,14 +35,16 @@ class V3MetaData { struct ManifestFileWrapper : public ManifestFile { explicit ManifestFileWrapper(int64_t commit_snapshotId, int64_t sequence_number) {} - ManifestFile wrap(ManifestFile file, int64_t first_row_id) { return *this; } + ManifestFile Wrap(ManifestFile file, std::optional first_row_id) { + return *this; + } }; /// \brief v3 manifest entry wrapper. struct ManifestEntryWrapper : public ManifestEntry { explicit ManifestEntryWrapper(int64_t commit_snapshot_id) {} - ManifestEntry wrap(ManifestEntry entry) { return *this; } + ManifestEntry Wrap(ManifestEntry entry) { return *this; } }; static ManifestFileWrapper manifestFileWrapper(int64_t commit_snapshotId, diff --git a/test/manifest_list_reader_test.cc b/test/manifest_list_reader_test.cc index 7497a1203..0cb211ba0 100644 --- a/test/manifest_list_reader_test.cc +++ b/test/manifest_list_reader_test.cc @@ -47,7 +47,7 @@ class ManifestListReaderV1Test : public ::testing::Test { void TestManifestListReading(const std::string& resource_name, const std::vector& expected_manifest_list) { std::string path = GetResourcePath(resource_name); - auto manifest_reader_result = ManifestListReader::MakeReader(path, file_io_); + auto manifest_reader_result = ManifestListReader::Make(path, file_io_); ASSERT_EQ(manifest_reader_result.has_value(), true); auto manifest_reader = std::move(manifest_reader_result.value()); diff --git a/test/manifest_reader_test.cc b/test/manifest_reader_test.cc index 2fb20b73b..da4bfa73a 100644 --- a/test/manifest_reader_test.cc +++ b/test/manifest_reader_test.cc @@ -107,8 +107,7 @@ TEST_F(ManifestReaderTest, BasicTest) { auto partition_schema = std::make_shared(std::vector({partition_field})); std::string path = GetResourcePath("56357cd7-391f-4df8-aa24-e7e667da8870-m4.avro"); - auto manifest_reader_result = - ManifestReader::MakeReader(path, file_io_, partition_schema); + auto manifest_reader_result = ManifestReader::Make(path, file_io_, partition_schema); ASSERT_EQ(manifest_reader_result.has_value(), true) << manifest_reader_result.error().message; auto manifest_reader = std::move(manifest_reader_result.value()); From d8d2c3f78bbeb8e084be2e69b638dd08febf6e05 Mon Sep 17 00:00:00 2001 From: HeartLinked Date: Tue, 19 Aug 2025 16:57:08 +0800 Subject: [PATCH 6/6] simple_v1 --- src/iceberg/manifest_writer_internal.cc | 3 +- src/iceberg/v1_metadata.h | 52 ++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/iceberg/manifest_writer_internal.cc b/src/iceberg/manifest_writer_internal.cc index d5b30d8af..ed7adfe8c 100644 --- a/src/iceberg/manifest_writer_internal.cc +++ b/src/iceberg/manifest_writer_internal.cc @@ -22,6 +22,7 @@ #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" #include "iceberg/schema.h" +#include "iceberg/v1_metadata.h" namespace iceberg { @@ -38,7 +39,7 @@ Status ManifestWriterV1::AddAll(const std::vector& files) { Status ManifestWriterV1::Close() { return {}; } ManifestEntry ManifestWriterV1::prepare(const ManifestEntry& entry) { - return wrapper_.Wrap(entry); + return static_cast(wrapper_.Wrap(entry)); } Status ManifestWriterV2::Add(const ManifestEntry& entry) { diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h index 4ec9a12fc..bd51932b2 100644 --- a/src/iceberg/v1_metadata.h +++ b/src/iceberg/v1_metadata.h @@ -21,6 +21,8 @@ /// \file iceberg/v1_metadata.h +#include + #include "iceberg/manifest_entry.h" #include "iceberg/manifest_list.h" @@ -31,23 +33,63 @@ namespace iceberg { /// Wrapper for v1 manifest list and manifest entry. class V1MetaData { public: - /// \brief v1 manifest file wrapper. + /// \brief Wraps a ManifestFile to conform to the v1 schema. struct ManifestFileWrapper : public ManifestFile { ManifestFileWrapper() = default; - ManifestFile Wrap(ManifestFile file) { return *this; } + ManifestFileWrapper& Wrap(const ManifestFile& file) { + static_cast(*this) = file; + return *this; + } + }; + + /// \brief Wraps a DataFile to conform to the v1 schema. + struct DataFileWrapper : public DataFile { + int64_t block_size_in_bytes = 64 * 1024 * 1024; + + DataFileWrapper() = default; + + DataFileWrapper& Wrap(const DataFile& file) { + static_cast(*this) = file; + return *this; + } }; - /// \brief v1 manifest entry wrapper. - struct ManifestEntryWrapper : public ManifestEntry { + /// \brief Wraps a ManifestEntry to conform to the v1 schema. + struct ManifestEntryWrapper { + ManifestStatus status; + int64_t snapshot_id; + DataFileWrapper data_file; + ManifestEntryWrapper() = default; - ManifestEntry Wrap(ManifestEntry entry) { return *this; } + ManifestEntryWrapper& Wrap(const ManifestEntry& entry) { + this->status = entry.status; + this->snapshot_id = entry.snapshot_id.value_or(0); + if (entry.data_file) { + this->data_file.Wrap(*entry.data_file); + } + return *this; + } + + explicit operator ManifestEntry() const { + ManifestEntry entry; + entry.status = this->status; + entry.snapshot_id = this->snapshot_id; + + entry.data_file = std::make_shared(this->data_file); + entry.sequence_number = 0; + entry.file_sequence_number = 0; + + return entry; + } }; static ManifestFileWrapper manifestFileWrapper() { return {}; } static ManifestEntryWrapper manifestEntryWrapper() { return {}; } + + static DataFileWrapper dataFileWrapper() { return {}; } }; } // namespace iceberg