From 9b22f5e3b81aba8684fa4dd5054ef39aabb6fc79 Mon Sep 17 00:00:00 2001 From: Zhuo Wang Date: Mon, 1 Dec 2025 16:57:05 +0800 Subject: [PATCH] feat: Impl InMemoryCatalog's UpdateTable --- .../catalog/memory/in_memory_catalog.cc | 70 ++++++-- src/iceberg/table.cc | 15 +- src/iceberg/table.h | 13 +- src/iceberg/table_metadata.cc | 153 +++++++++++++++++- src/iceberg/table_metadata.h | 83 +++++++++- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/in_memory_catalog_test.cc | 73 ++++++++- src/iceberg/test/location_util_test.cc | 64 ++++++++ src/iceberg/test/meson.build | 1 + src/iceberg/test/metadata_io_test.cc | 82 +++++++++- .../test/table_metadata_builder_test.cc | 56 +++++++ src/iceberg/test/table_test.cc | 6 +- src/iceberg/util/location_util.h | 43 +++++ 13 files changed, 620 insertions(+), 40 deletions(-) create mode 100644 src/iceberg/test/location_util_test.cc create mode 100644 src/iceberg/util/location_util.h diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index ebc490103..decb14c18 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -21,10 +21,13 @@ #include #include // IWYU pragma: keep +#include -#include "iceberg/exception.h" #include "iceberg/table.h" +#include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -121,6 +124,13 @@ class ICEBERG_EXPORT InMemoryNamespace { /// \return The metadata location if the table exists; error otherwise. Result GetTableMetadataLocation(const TableIdentifier& table_ident) const; + /// \brief Updates the metadata location for the specified table. + /// + /// \param table_ident The identifier of the table. + /// \param metadata_location The new metadata location. + Status UpdateTableMetadataLocation(const TableIdentifier& table_ident, + const std::string& metadata_location); + /// \brief Internal utility for retrieving a namespace node pointer from the tree. /// /// \tparam NamespacePtr The type of the namespace node pointer. @@ -279,7 +289,7 @@ Result> InMemoryNamespace::ListTables( return table_names; } -Status InMemoryNamespace::RegisterTable(TableIdentifier const& table_ident, +Status InMemoryNamespace::RegisterTable(const TableIdentifier& table_ident, const std::string& metadata_location) { const auto ns = GetNamespace(this, table_ident.ns); ICEBERG_RETURN_UNEXPECTED(ns); @@ -290,21 +300,21 @@ Status InMemoryNamespace::RegisterTable(TableIdentifier const& table_ident, return {}; } -Status InMemoryNamespace::UnregisterTable(TableIdentifier const& table_ident) { +Status InMemoryNamespace::UnregisterTable(const TableIdentifier& table_ident) { const auto ns = GetNamespace(this, table_ident.ns); ICEBERG_RETURN_UNEXPECTED(ns); ns.value()->table_metadata_locations_.erase(table_ident.name); return {}; } -Result InMemoryNamespace::TableExists(TableIdentifier const& table_ident) const { +Result InMemoryNamespace::TableExists(const TableIdentifier& table_ident) const { const auto ns = GetNamespace(this, table_ident.ns); ICEBERG_RETURN_UNEXPECTED(ns); return ns.value()->table_metadata_locations_.contains(table_ident.name); } Result InMemoryNamespace::GetTableMetadataLocation( - TableIdentifier const& table_ident) const { + const TableIdentifier& table_ident) const { const auto ns = GetNamespace(this, table_ident.ns); ICEBERG_RETURN_UNEXPECTED(ns); const auto it = ns.value()->table_metadata_locations_.find(table_ident.name); @@ -314,17 +324,24 @@ Result InMemoryNamespace::GetTableMetadataLocation( return it->second; } +Status InMemoryNamespace::UpdateTableMetadataLocation( + const TableIdentifier& table_ident, const std::string& metadata_location) { + ICEBERG_ASSIGN_OR_RAISE(auto ns, GetNamespace(this, table_ident.ns)); + ns->table_metadata_locations_[table_ident.name] = metadata_location; + return {}; +} + std::shared_ptr InMemoryCatalog::Make( - std::string const& name, std::shared_ptr const& file_io, - std::string const& warehouse_location, - std::unordered_map const& properties) { + const std::string& name, const std::shared_ptr& file_io, + const std::string& warehouse_location, + const std::unordered_map& properties) { return std::make_shared(name, file_io, warehouse_location, properties); } InMemoryCatalog::InMemoryCatalog( - std::string const& name, std::shared_ptr const& file_io, - std::string const& warehouse_location, - std::unordered_map const& properties) + const std::string& name, const std::shared_ptr& file_io, + const std::string& warehouse_location, + const std::unordered_map& properties) : catalog_name_(std::move(name)), properties_(std::move(properties)), file_io_(std::move(file_io)), @@ -394,7 +411,32 @@ Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) { - return NotImplemented("update table"); + std::unique_lock lock(mutex_); + ICEBERG_ASSIGN_OR_RAISE(auto metadata_location, + root_namespace_->GetTableMetadataLocation(identifier)); + + ICEBERG_ASSIGN_OR_RAISE(auto base, + TableMetadataUtil::Read(*file_io_, metadata_location)); + base->metadata_file_location = std::move(metadata_location); + + for (const auto& requirement : requirements) { + ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get())); + } + + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + for (const auto& update : updates) { + update->ApplyTo(*builder); + } + ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build()); + + ICEBERG_RETURN_UNEXPECTED( + TableMetadataUtil::Write(*file_io_, base.get(), updated.get())); + ICEBERG_RETURN_UNEXPECTED(root_namespace_->UpdateTableMetadataLocation( + identifier, updated->metadata_file_location)); + TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), updated.get()); + + return std::make_unique(identifier, std::move(updated), file_io_, + std::static_pointer_cast(shared_from_this())); } Result> InMemoryCatalog::StageCreateTable( @@ -435,9 +477,9 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location.value())); + metadata->metadata_file_location = metadata_location.value(); - return std::make_unique
(identifier, std::move(metadata), - metadata_location.value(), file_io_, + return std::make_unique
(identifier, std::move(metadata), file_io_, std::static_pointer_cast(shared_from_this())); } diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 458711255..343165d87 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -28,17 +28,16 @@ #include "iceberg/table_scan.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" +#include "iceberg/util/timepoint.h" namespace iceberg { Table::~Table() = default; Table::Table(TableIdentifier identifier, std::shared_ptr metadata, - std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog) + std::shared_ptr io, std::shared_ptr catalog) : identifier_(std::move(identifier)), metadata_(std::move(metadata)), - metadata_location_(std::move(metadata_location)), io_(std::move(io)), catalog_(std::move(catalog)), properties_(TableProperties::FromMap(metadata_->properties)), @@ -52,9 +51,9 @@ Status Table::Refresh() { } ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_)); - if (metadata_location_ != refreshed_table->metadata_location_) { + if (metadata_->metadata_file_location != + refreshed_table->metadata_->metadata_file_location) { metadata_ = std::move(refreshed_table->metadata_); - metadata_location_ = std::move(refreshed_table->metadata_location_); io_ = std::move(refreshed_table->io_); properties_ = std::move(refreshed_table->properties_); metadata_cache_ = std::make_unique(metadata_.get()); @@ -91,8 +90,14 @@ Table::sort_orders() const { const TableProperties& Table::properties() const { return *properties_; } +const std::string& Table::metadata_file_location() const { + return metadata_->metadata_file_location; +} + const std::string& Table::location() const { return metadata_->location; } +const TimePointMs& Table::last_updated_ms() const { return metadata_->last_updated_ms; } + Result> Table::current_snapshot() const { return metadata_->Snapshot(); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index df3a0c32e..9a2361bb6 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -29,6 +29,7 @@ #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" +#include "iceberg/util/timepoint.h" namespace iceberg { @@ -45,8 +46,7 @@ class ICEBERG_EXPORT Table { /// \param[in] catalog The catalog that this table belongs to. If null, the table will /// be read-only. Table(TableIdentifier identifier, std::shared_ptr metadata, - std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + std::shared_ptr io, std::shared_ptr catalog); /// \brief Return the identifier of this table const TableIdentifier& name() const { return identifier_; } @@ -84,9 +84,17 @@ class ICEBERG_EXPORT Table { /// \brief Return a map of string properties for this table const TableProperties& properties() const; + /// \brief Return the table's metadata file location + const std::string& metadata_file_location() const; + /// \brief Return the table's base location const std::string& location() const; + /// \brief Get the time when this table was last updated + /// + /// \return the time when this table was last updated + const TimePointMs& last_updated_ms() const; + /// \brief Return the table's current snapshot, return NotFoundError if not found Result> current_snapshot() const; @@ -127,7 +135,6 @@ class ICEBERG_EXPORT Table { private: const TableIdentifier identifier_; std::shared_ptr metadata_; - std::string metadata_location_; std::shared_ptr io_; std::shared_ptr catalog_; std::unique_ptr properties_; diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index a90c5b0ca..936387e40 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -20,6 +20,7 @@ #include "iceberg/table_metadata.h" #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include @@ -38,8 +40,10 @@ #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/table_properties.h" #include "iceberg/table_update.h" #include "iceberg/util/gzip_internal.h" +#include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" #include "iceberg/util/uuid.h" namespace iceberg { @@ -48,6 +52,8 @@ const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min(); constexpr int32_t kLastAdded = -1; } // namespace +static constexpr std::string_view kMetadataFolderName = "metadata"; + std::string ToString(const SnapshotLogEntry& entry) { return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]", entry.timestamp_ms, entry.snapshot_id); @@ -220,30 +226,57 @@ Result TableMetadataCache::InitSnapshotMap( std::ranges::to(); } -// TableMetadataUtil implementation +Result CodecTypeUtil::CodecFromString( + const std::string_view& name) { + std::string name_upper = StringUtils::ToUpper(name); + if (name_upper == kCodecTypeGzip) { + return MetadataFileCodecType::kGzip; + } else if (name_upper == kCodecTypeNone) { + return MetadataFileCodecType::kNone; + } + return InvalidArgument("Invalid codec name: {}", name); +} -Result TableMetadataUtil::CodecFromFileName( +Result CodecTypeUtil::CodecFromFileName( std::string_view file_name) { - auto pos = file_name.find_last_of(".metadata.json"); + auto pos = file_name.find_last_of(kTableMetadataFileSuffix); if (pos == std::string::npos) { return InvalidArgument("{} is not a valid metadata file", file_name); } // We have to be backward-compatible with .metadata.json.gz files - if (file_name.ends_with(".metadata.json.gz")) { + if (file_name.ends_with(kCompGzipTableMetadataFileSuffix)) { return MetadataFileCodecType::kGzip; } std::string_view file_name_without_suffix = file_name.substr(0, pos); - if (file_name_without_suffix.ends_with(".gz")) { + if (file_name_without_suffix.ends_with(kGzipTableMetadataFileExtension)) { return MetadataFileCodecType::kGzip; } return MetadataFileCodecType::kNone; } +Result CodecTypeUtil::CodecNameToFileExtension( + const std::string_view& codec) { + ICEBERG_ASSIGN_OR_RAISE(MetadataFileCodecType codec_type, CodecFromString(codec)); + return CodecTypeToFileExtension(codec_type); +} + +std::string CodecTypeUtil::CodecTypeToFileExtension(MetadataFileCodecType codec) { + switch (codec) { + case MetadataFileCodecType::kGzip: + return std::string(kGzipTableMetadataFileSuffix); + case MetadataFileCodecType::kNone: + return std::string(kTableMetadataFileSuffix); + } + std::unreachable(); +} + +// TableMetadataUtil implementation + Result> TableMetadataUtil::Read( FileIO& io, const std::string& location, std::optional length) { - ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecFromFileName(location)); + ICEBERG_ASSIGN_OR_RAISE(auto codec_type, CodecTypeUtil::CodecFromFileName(location)); ICEBERG_ASSIGN_OR_RAISE(auto content, io.ReadFile(location, length)); if (codec_type == MetadataFileCodecType::kGzip) { @@ -258,6 +291,23 @@ Result> TableMetadataUtil::Read( return TableMetadataFromJson(json); } +Status TableMetadataUtil::Write(FileIO& io, const TableMetadata* base, + TableMetadata* metadata) { + ICEBERG_CHECK(metadata != nullptr, "The metadata is nullptr."); + + int version = -1; + if (base != nullptr && !base->metadata_file_location.empty()) { + // parse current version from location + version = ParseVersionFromLocation(base->metadata_file_location); + } + + ICEBERG_ASSIGN_OR_RAISE(std::string new_file_location, + NewTableMetadataFilePath(*metadata, version + 1)); + ICEBERG_RETURN_UNEXPECTED(Write(io, new_file_location, *metadata)); + metadata->metadata_file_location = std::move(new_file_location); + return {}; +} + Status TableMetadataUtil::Write(FileIO& io, const std::string& location, const TableMetadata& metadata) { auto json = ToJson(metadata); @@ -265,6 +315,66 @@ Status TableMetadataUtil::Write(FileIO& io, const std::string& location, return io.WriteFile(location, json_string); } +void TableMetadataUtil::DeleteRemovedMetadataFiles(FileIO& io, const TableMetadata* base, + const TableMetadata* metadata) { + if (!base) { + return; + } + + bool delete_after_commit = TableProperties::kMetadataDeleteAfterCommitEnabled.value(); + if (auto it = metadata->properties.find( + TableProperties::kMetadataDeleteAfterCommitEnabled.key()); + it != metadata->properties.end()) { + delete_after_commit = + StringUtils::EqualsIgnoreCase(it->second, "true") || it->second == "1"; + } + + if (delete_after_commit) { + auto current_files = + metadata->metadata_log | + std::ranges::to>(); + std::ranges::for_each( + base->metadata_log | std::views::filter([¤t_files](const auto& entry) { + return !current_files.contains(entry); + }), + [&io](const auto& entry) { auto status = io.DeleteFile(entry.metadata_file); }); + } +} + +int TableMetadataUtil::ParseVersionFromLocation( + const std::string_view& metadata_location) { + size_t version_start = metadata_location.find_last_of('/') + 1; + size_t version_end = metadata_location.find('-', version_start); + + int version = -1; + if (version_end != std::string::npos) { + std::from_chars(metadata_location.data() + version_start, + metadata_location.data() + version_end, version); + } + return version; +} + +Result TableMetadataUtil::NewTableMetadataFilePath(const TableMetadata& meta, + int new_version) { + std::string_view codec_name = CodecTypeUtil::kCodecTypeNone; + if (auto it = meta.properties.find(TableProperties::kMetadataCompression.key()); + it != meta.properties.end()) { + codec_name = it->second; + } + ICEBERG_ASSIGN_OR_RAISE(std::string file_extension, + CodecTypeUtil::CodecNameToFileExtension(codec_name)); + + std::string uuid = Uuid::GenerateV7().ToString(); + std::string filename = std::format("{:05d}-{}{}", new_version, uuid, file_extension); + + if (auto it = meta.properties.find(TableProperties::kWriteMetadataLocation.key()); + it != meta.properties.end()) { + return std::format("{}/{}", LocationUtil::StripTrailingSlash(it->second), filename); + } else { + return std::format("{}/{}/{}", meta.location, kMetadataFolderName, filename); + } +} + // TableMetadataBuilder implementation struct TableMetadataBuilder::Impl { @@ -319,6 +429,9 @@ struct TableMetadataBuilder::Impl { for (const auto& order : metadata.sort_orders) { sort_orders_by_id.emplace(order->order_id(), order); } + + metadata.last_updated_ms = kInvalidLastUpdatedMs; + metadata.metadata_file_location.clear(); } }; @@ -664,7 +777,33 @@ Result> TableMetadataBuilder::Build() { std::chrono::system_clock::now().time_since_epoch())}; } - // 4. Create and return the TableMetadata + // 4. Buildup metadata_log from base metadata + int32_t max_metadata_log_size = TableProperties::kMetadataPreviousVersionsMax.value(); + if (auto iter = impl_->metadata.properties.find( + TableProperties::kMetadataPreviousVersionsMax.key()); + iter != impl_->metadata.properties.end()) { + if (auto [_, ec] = std::from_chars(iter->second.data(), + iter->second.data() + iter->second.size(), + max_metadata_log_size); + ec != std::errc()) [[unlikely]] { + return InvalidArgument("Invalid value for property {}: {}", + TableProperties::kMetadataPreviousVersionsMax.key(), + iter->second); + } + } + if (impl_->base != nullptr) { + impl_->metadata.metadata_log.emplace_back(impl_->base->last_updated_ms, + impl_->base->metadata_file_location); + } + if (impl_->metadata.metadata_log.size() > max_metadata_log_size) { + impl_->metadata.metadata_log.erase( + impl_->metadata.metadata_log.begin(), + impl_->metadata.metadata_log.end() - max_metadata_log_size); + } + + // 5. TODO: update snapshot_log + + // 6. Create and return the TableMetadata auto result = std::make_unique(std::move(impl_->metadata)); return result; diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 2d53fcb08..c6eb74a15 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -22,6 +22,7 @@ /// \file iceberg/table_metadata.h /// Table metadata for Iceberg tables. +#include #include #include #include @@ -58,6 +59,12 @@ struct ICEBERG_EXPORT MetadataLogEntry { friend bool operator==(const MetadataLogEntry& lhs, const MetadataLogEntry& rhs) { return lhs.timestamp_ms == rhs.timestamp_ms && lhs.metadata_file == rhs.metadata_file; } + + struct Hasher { + size_t operator()(const MetadataLogEntry& m) const noexcept { + return std::hash{}(m.metadata_file); + } + }; }; /// \brief Represents the metadata for an Iceberg table @@ -76,6 +83,8 @@ struct ICEBERG_EXPORT TableMetadata { static constexpr int64_t kInvalidSequenceNumber = -1; static constexpr int64_t kInitialRowId = 0; + /// The location of the table metadata file + std::string metadata_file_location; /// An integer version number for the format int8_t format_version; /// A UUID that identifies the table @@ -458,14 +467,41 @@ enum class ICEBERG_EXPORT MetadataFileCodecType { kGzip, }; -/// \brief Utility class for table metadata -struct ICEBERG_EXPORT TableMetadataUtil { +struct ICEBERG_EXPORT CodecTypeUtil { + /// \brief Returns the MetadataFileCodecType corresponding to the given string. + /// + /// \param name The string to parse. + /// \return The MetadataFileCodecType corresponding to the given string. + static Result CodecFromString(const std::string_view& name); + /// \brief Get the codec type from the table metadata file name. /// /// \param file_name The name of the table metadata file. /// \return The codec type of the table metadata file. static Result CodecFromFileName(std::string_view file_name); + /// \brief Get the file extension from the codec type. + /// \param codec The codec name. + /// \return The file extension of the codec. + static Result CodecNameToFileExtension(const std::string_view& codec); + + /// \brief Get the file extension from the codec type. + /// \param codec The codec type. + /// \return The file extension of the codec. + static std::string CodecTypeToFileExtension(MetadataFileCodecType codec); + + inline static constexpr std::string_view kTableMetadataFileSuffix = ".metadata.json"; + inline static constexpr std::string_view kCompGzipTableMetadataFileSuffix = + ".metadata.json.gz"; + inline static constexpr std::string_view kGzipTableMetadataFileSuffix = + ".gz.metadata.json"; + inline static constexpr std::string_view kGzipTableMetadataFileExtension = ".gz"; + inline static constexpr std::string_view kCodecTypeGzip = "GZIP"; + inline static constexpr std::string_view kCodecTypeNone = "NONE"; +}; + +/// \brief Utility class for table metadata +struct ICEBERG_EXPORT TableMetadataUtil { /// \brief Read the table metadata file. /// /// \param io The file IO to use to read the table metadata. @@ -476,6 +512,29 @@ struct ICEBERG_EXPORT TableMetadataUtil { class FileIO& io, const std::string& location, std::optional length = std::nullopt); + /// \brief Write a new metadata file to storage. + /// + /// Serializes the table metadata to JSON and writes it to a new metadata + /// file. If no location is specified in the metadata, generates a new + /// file path based on the version number. + /// + /// \param io The FileIO instance for writing files + /// \param base The base metadata (can be null for new tables) + /// \param metadata The metadata to write, which will be updated with the new location + static Status Write(FileIO& io, const TableMetadata* base, TableMetadata* metadata); + + /// \brief Delete removed metadata files based on retention policy. + /// + /// Removes obsolete metadata files that are no longer referenced in the + /// current metadata log, based on the metadata.delete-after-commit.enabled + /// property. + /// + /// \param io The FileIO instance for deleting files + /// \param base The previous metadata version + /// \param metadata The current metadata containing the updated log + static void DeleteRemovedMetadataFiles(FileIO& io, const TableMetadata* base, + const TableMetadata* metadata); + /// \brief Write the table metadata to a file. /// /// \param io The file IO to use to write the table metadata. @@ -483,6 +542,26 @@ struct ICEBERG_EXPORT TableMetadataUtil { /// \param metadata The table metadata to write. static Status Write(FileIO& io, const std::string& location, const TableMetadata& metadata); + + private: + /// \brief Parse the version number from a metadata file location. + /// + /// Extracts the version number from a metadata file path which follows + /// the format: vvvvv-uuid.metadata.json where vvvvv is the zero-padded + /// version number. + /// + /// \param metadata_location The metadata file location string + /// \return The parsed version number, or -1 if parsing fails or the + /// location doesn't contain a version + static int ParseVersionFromLocation(const std::string_view& metadata_location); + + /// \brief Generate a new metadata file path for a table. + /// + /// \param metadata The table metadata. + /// \param version The version number for the new metadata file. + /// \return The generated metadata file path. + static Result NewTableMetadataFilePath(const TableMetadata& metadata, + int version); }; } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 9892e3d4f..8fcedbec0 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -103,6 +103,7 @@ add_iceberg_test(util_test decimal_test.cc endian_test.cc formatter_test.cc + location_util_test.cc string_util_test.cc truncate_util_test.cc uuid_test.cc diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index f7e2f50a9..527366a5d 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -20,6 +20,8 @@ #include "iceberg/catalog/memory/in_memory_catalog.h" #include +#include +#include #include #include @@ -28,10 +30,14 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/schema.h" #include "iceberg/table.h" +#include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" #include "iceberg/test/test_resource.h" +#include "iceberg/util/uuid.h" namespace iceberg { @@ -63,7 +69,7 @@ class InMemoryCatalogTest : public ::testing::Test { info->test_suite_name(), info->name(), table_name); // generate a unique directory for the table std::error_code ec; - std::filesystem::create_directories(table_location, ec); + std::filesystem::create_directories(table_location + "metadata", ec); if (ec) { throw std::runtime_error( std::format("Failed to create temporary directory: {}, error message: {}", @@ -130,6 +136,7 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { .WillOnce(::testing::Return( std::make_unique
(table_ident, std::make_shared(TableMetadata{ + .metadata_file_location = "s3://location/1.json", .schemas = {schema}, .current_schema_id = 1, .current_snapshot_id = 1, @@ -137,7 +144,7 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { .snapshot_id = 1, .sequence_number = 1, })}}), - "s3://location/1.json", io, catalog))); + io, catalog))); auto load_table_result = catalog->LoadTable(table_ident); ASSERT_THAT(load_table_result, IsOk()); auto loaded_table = std::move(load_table_result.value()); @@ -148,6 +155,7 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { .WillOnce(::testing::Return( std::make_unique
(table_ident, std::make_shared(TableMetadata{ + .metadata_file_location = "s3://location/2.json", .schemas = {schema}, .current_schema_id = 1, .current_snapshot_id = 2, @@ -159,13 +167,72 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { .snapshot_id = 2, .sequence_number = 2, })}}), - "s3://location/2.json", io, catalog))); + io, catalog))); auto refreshed_result = loaded_table->Refresh(); ASSERT_THAT(refreshed_result, IsOk()); // check table is refreshed ASSERT_EQ(loaded_table->current_snapshot().value()->snapshot_id, 2); } +TEST_F(InMemoryCatalogTest, UpdateTable) { + // First, create and register a table + TableIdentifier table_ident{.ns = {}, .name = "t1"}; + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, + ReadTableMetadataFromResource("TableMetadataV2Valid.json")); + + auto table_location = GenerateTestTableLocation(table_ident.name); + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location, Uuid::GenerateV7().ToString()); + metadata->location = table_location; + auto status = TableMetadataUtil::Write(*file_io_, metadata_location, *metadata); + EXPECT_THAT(status, IsOk()); + + auto table = catalog_->RegisterTable(table_ident, metadata_location); + EXPECT_THAT(table, IsOk()); + ASSERT_EQ(table.value()->name().name, "t1"); + ASSERT_EQ(table.value()->metadata_file_location(), metadata_location); + + // Prepare updates - add a new property + std::vector> updates; + auto property_update = std::make_unique( + std::unordered_map{{"property2", "value2"}}); + updates.push_back(std::move(property_update)); + + // Prepare requirements - assert table must exist + std::vector> requirements; + requirements.push_back(std::make_unique(metadata->table_uuid)); + + // Perform the update on a nonexist table + TableIdentifier nonexist_table_ident{.ns = {}, .name = "nonexist_table"}; + auto res = catalog_->UpdateTable(nonexist_table_ident, std::move(requirements), + std::move(updates)); + EXPECT_THAT(res, IsError(ErrorKind::kNotFound)); + + // Verify requirements failed on an exist table + std::vector> bad_requirements; + bad_requirements.push_back(std::make_unique("invalid-uuid")); + res = + catalog_->UpdateTable(table_ident, std::move(bad_requirements), std::move(updates)); + EXPECT_THAT(res, IsError(ErrorKind::kCommitFailed)); + + // Perform the update + auto update_result = catalog_->UpdateTable(table_ident, requirements, updates); + EXPECT_THAT(update_result, IsOk()); + + // Verify the update by loading the table and checking properties + auto load_result = catalog_->LoadTable(table_ident); + EXPECT_THAT(load_result, IsOk()); + + auto updated_table = std::move(load_result.value()); + + // Verify that metadata file was updated (should have a new version) + EXPECT_EQ(table.value()->uuid(), updated_table->uuid()); + EXPECT_GT(updated_table->last_updated_ms(), table.value()->last_updated_ms()); + EXPECT_THAT(updated_table->metadata_file_location(), + testing::HasSubstr("metadata/00002-")); +} + TEST_F(InMemoryCatalogTest, DropTable) { TableIdentifier tableIdent{.ns = {}, .name = "t1"}; auto result = catalog_->DropTable(tableIdent, false); diff --git a/src/iceberg/test/location_util_test.cc b/src/iceberg/test/location_util_test.cc new file mode 100644 index 000000000..7098868a9 --- /dev/null +++ b/src/iceberg/test/location_util_test.cc @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/location_util.h" + +#include + +namespace iceberg { + +TEST(LocationUtilTest, StripTrailingSlash) { + // Test normal paths with trailing slashes + ASSERT_EQ("/path/to/dir", LocationUtil::StripTrailingSlash("/path/to/dir/")); + ASSERT_EQ("/path/to/dir", LocationUtil::StripTrailingSlash("/path/to/dir//")); + ASSERT_EQ("/path/to/dir", LocationUtil::StripTrailingSlash("/path/to/dir///")); + + // Test paths without trailing slashes + ASSERT_EQ("/path/to/dir", LocationUtil::StripTrailingSlash("/path/to/dir")); + ASSERT_EQ("/path/to/file.txt", LocationUtil::StripTrailingSlash("/path/to/file.txt")); + + // Test root path + ASSERT_EQ("", LocationUtil::StripTrailingSlash("/")); + ASSERT_EQ("", LocationUtil::StripTrailingSlash("//")); + + // Test empty string + ASSERT_EQ("", LocationUtil::StripTrailingSlash("")); + + // Test URLs with protocols + ASSERT_EQ("http://example.com", + LocationUtil::StripTrailingSlash("http://example.com/")); + ASSERT_EQ("https://example.com/path", + LocationUtil::StripTrailingSlash("https://example.com/path/")); + + // Test that protocol endings are preserved + ASSERT_EQ("http://", LocationUtil::StripTrailingSlash("http://")); + ASSERT_EQ("https://", LocationUtil::StripTrailingSlash("https://")); + ASSERT_EQ("s3://", LocationUtil::StripTrailingSlash("s3://")); + + // Test paths with protocol-like substrings in the middle + ASSERT_EQ("/path/http://test", LocationUtil::StripTrailingSlash("/path/http://test/")); + ASSERT_EQ("/path/https://test", + LocationUtil::StripTrailingSlash("/path/https://test/")); + + // Test multiple slashes not at the end + ASSERT_EQ("/path//to/dir", LocationUtil::StripTrailingSlash("/path//to/dir/")); + ASSERT_EQ("/path///to/dir", LocationUtil::StripTrailingSlash("/path///to/dir/")); +} + +} // namespace iceberg diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index c73abe188..dd18bdfa1 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -82,6 +82,7 @@ iceberg_tests = { 'decimal_test.cc', 'endian_test.cc', 'formatter_test.cc', + 'location_util_test.cc', 'string_util_test.cc', 'truncate_util_test.cc', 'uuid_test.cc', diff --git a/src/iceberg/test/metadata_io_test.cc b/src/iceberg/test/metadata_io_test.cc index 8613ae244..c5aad36fd 100644 --- a/src/iceberg/test/metadata_io_test.cc +++ b/src/iceberg/test/metadata_io_test.cc @@ -17,19 +17,24 @@ * under the License. */ +#include + #include #include #include #include +#include #include #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/file_io.h" #include "iceberg/json_internal.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" #include "iceberg/test/matchers.h" #include "iceberg/test/temp_file_test_base.h" @@ -41,7 +46,10 @@ class MetadataIOTest : public TempFileTestBase { TempFileTestBase::SetUp(); io_ = std::make_shared( std::make_shared<::arrow::fs::LocalFileSystem>()); - temp_filepath_ = CreateNewTempFilePathWithSuffix(".metadata.json"); + location_ = CreateTempDirectory(); + temp_filepath_ = std::format("{}/{}", location_, "metadata/00000-xxx.metadata.json"); + ASSERT_TRUE( + std::filesystem::create_directories(std::format("{}/metadata", location_))); } TableMetadata PrepareMetadata() { @@ -52,7 +60,7 @@ class MetadataIOTest : public TempFileTestBase { return TableMetadata{.format_version = 1, .table_uuid = "1234567890", - .location = "s3://bucket/path", + .location = location_, .last_sequence_number = 0, .schemas = {schema}, .current_schema_id = 1, @@ -72,6 +80,7 @@ class MetadataIOTest : public TempFileTestBase { } std::shared_ptr io_; + std::string location_; std::string temp_filepath_; }; @@ -125,4 +134,73 @@ TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) { EXPECT_EQ(*metadata_read, metadata); } +TEST_F(MetadataIOTest, WriteMetadataWithBase) { + TableMetadata base = PrepareMetadata(); + + { + // Invalid base metadata_file_location, set version to 0 + TableMetadata new_metadata = PrepareMetadata(); + base.metadata_file_location = "invalid_location"; + EXPECT_THAT(TableMetadataUtil::Write(*io_, &base, &new_metadata), IsOk()); + EXPECT_THAT(new_metadata.metadata_file_location, + testing::HasSubstr("/metadata/00000-")); + } + + // Reset base metadata_file_location + base.metadata_file_location = temp_filepath_; + + { + // Invalid codec type property + TableMetadata new_metadata = PrepareMetadata(); + new_metadata.properties[TableProperties::kMetadataCompression.key()] = "invalid"; + auto result = TableMetadataUtil::Write(*io_, &base, &new_metadata); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Invalid codec name")); + } + + { + // Specify write location property + TableMetadata new_metadata = PrepareMetadata(); + new_metadata.properties[TableProperties::kWriteMetadataLocation.key()] = location_; + auto result = TableMetadataUtil::Write(*io_, &base, &new_metadata); + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(new_metadata.metadata_file_location, + testing::HasSubstr(std::format("{}/00001-", location_))); + } + + { + // Default write location + TableMetadata new_metadata = PrepareMetadata(); + auto result = TableMetadataUtil::Write(*io_, &base, &new_metadata); + EXPECT_THAT(result, IsOk()); + EXPECT_THAT(new_metadata.metadata_file_location, + testing::HasSubstr(std::format("{}/metadata/00001-", location_))); + } +} + +TEST_F(MetadataIOTest, RemoveDeletedMetadataFiles) { + TableMetadata base1 = PrepareMetadata(); + base1.properties[TableProperties::kMetadataPreviousVersionsMax.key()] = "1"; + EXPECT_THAT(TableMetadataUtil::Write(*io_, nullptr, &base1), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto base2, TableMetadataBuilder::BuildFrom(&base1)->Build()); + EXPECT_THAT(TableMetadataUtil::Write(*io_, &base1, base2.get()), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto new_metadata, + TableMetadataBuilder::BuildFrom(base2.get())->Build()); + EXPECT_THAT(TableMetadataUtil::Write(*io_, base2.get(), new_metadata.get()), IsOk()); + + // The first metadata file should not be deleted + new_metadata->properties[TableProperties::kMetadataDeleteAfterCommitEnabled.key()] = + "false"; + TableMetadataUtil::DeleteRemovedMetadataFiles(*io_, base2.get(), new_metadata.get()); + EXPECT_TRUE(std::filesystem::exists(base1.metadata_file_location)); + + // The first metadata file should be deleted + new_metadata->properties[TableProperties::kMetadataDeleteAfterCommitEnabled.key()] = + "true"; + TableMetadataUtil::DeleteRemovedMetadataFiles(*io_, base2.get(), new_metadata.get()); + EXPECT_FALSE(std::filesystem::exists(base1.metadata_file_location)); +} + } // namespace iceberg diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index a1e46615f..af8054db0 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -21,14 +21,17 @@ #include #include +#include #include #include "iceberg/partition_spec.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_field.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" @@ -49,6 +52,7 @@ std::shared_ptr CreateTestSchema() { // Helper function to create base metadata for tests std::unique_ptr CreateBaseMetadata() { auto metadata = std::make_unique(); + metadata->metadata_file_location = "s3://bucket/test/00010-xxx.metadata.json"; metadata->format_version = 2; metadata->table_uuid = "test-uuid-1234"; metadata->location = "s3://bucket/test"; @@ -83,6 +87,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) { EXPECT_EQ(metadata->default_spec_id, PartitionSpec::kInitialSpecId); EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kInitialSortOrderId); EXPECT_EQ(metadata->current_snapshot_id, Snapshot::kInvalidSnapshotId); + EXPECT_TRUE(metadata->metadata_log.empty()); } TEST(TableMetadataBuilderTest, BuildFromExisting) { @@ -96,6 +101,57 @@ TEST(TableMetadataBuilderTest, BuildFromExisting) { EXPECT_EQ(metadata->format_version, 2); EXPECT_EQ(metadata->table_uuid, "test-uuid-1234"); EXPECT_EQ(metadata->location, "s3://bucket/test"); + ASSERT_EQ(1, metadata->metadata_log.size()); + EXPECT_EQ(base->metadata_file_location, metadata->metadata_log[0].metadata_file); + EXPECT_EQ(base->last_updated_ms, metadata->metadata_log[0].timestamp_ms); +} + +TEST(TableMetadataBuilderTest, BuildupMetadataLog) { + auto base = CreateBaseMetadata(); + base->metadata_log = { + {.timestamp_ms = TimePointMs{std::chrono::milliseconds(100)}, + .metadata_file = "s3://bucket/test/00000-aaa.metadata.json"}, + {.timestamp_ms = TimePointMs{std::chrono::milliseconds(200)}, + .metadata_file = "s3://bucket/test/00001-bbb.metadata.json"}, + }; + + { + // Invalid property value for max previous versions + base->properties[TableProperties::kMetadataPreviousVersionsMax.key()] = "invalid"; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + ASSERT_NE(builder, nullptr); + auto metadata = builder->Build(); + EXPECT_THAT(metadata, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(metadata, HasErrorMessage("Invalid value for property")); + } + + { + // Base metadata_log size less than max size + base->properties[TableProperties::kMetadataPreviousVersionsMax.key()] = "3"; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + ASSERT_NE(builder, nullptr); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(3, metadata->metadata_log.size()); + EXPECT_EQ(base->metadata_log[0].metadata_file, + metadata->metadata_log[0].metadata_file); + EXPECT_EQ(base->metadata_log[1].metadata_file, + metadata->metadata_log[1].metadata_file); + EXPECT_EQ(base->last_updated_ms, metadata->metadata_log[2].timestamp_ms); + EXPECT_EQ(base->metadata_file_location, metadata->metadata_log[2].metadata_file); + } + + { + // Base metadata_log size greater than max size + base->properties[TableProperties::kMetadataPreviousVersionsMax.key()] = "2"; + auto builder = TableMetadataBuilder::BuildFrom(base.get()); + ASSERT_NE(builder, nullptr); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(2, metadata->metadata_log.size()); + EXPECT_EQ(base->metadata_log[1].metadata_file, + metadata->metadata_log[0].metadata_file); + EXPECT_EQ(base->last_updated_ms, metadata->metadata_log[1].timestamp_ms); + EXPECT_EQ(base->metadata_file_location, metadata->metadata_log[1].metadata_file); + } } // Test AssignUUID diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index 59710e0f6..6d176ff59 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -35,8 +35,7 @@ TEST(Table, TableV1) { ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource("TableMetadataV1Valid.json")); TableIdentifier tableIdent{.ns = {}, .name = "test_table_v1"}; - Table table(tableIdent, std::move(metadata), "s3://bucket/test/location/meta/", nullptr, - nullptr); + Table table(tableIdent, std::move(metadata), nullptr, nullptr); ASSERT_EQ(table.name().name, "test_table_v1"); // Check table schema @@ -75,8 +74,7 @@ TEST(Table, TableV2) { ReadTableMetadataFromResource("TableMetadataV2Valid.json")); TableIdentifier tableIdent{.ns = {}, .name = "test_table_v2"}; - Table table(tableIdent, std::move(metadata), "s3://bucket/test/location/meta/", nullptr, - nullptr); + Table table(tableIdent, std::move(metadata), nullptr, nullptr); ASSERT_EQ(table.name().name, "test_table_v2"); // Check table schema diff --git a/src/iceberg/util/location_util.h b/src/iceberg/util/location_util.h new file mode 100644 index 000000000..547dd5cef --- /dev/null +++ b/src/iceberg/util/location_util.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "iceberg/iceberg_export.h" + +namespace iceberg { + +class ICEBERG_EXPORT LocationUtil { + public: + static std::string StripTrailingSlash(const std::string& path) { + if (path.empty()) { + return ""; + } + + std::string_view result = path; + while (result.ends_with("/") && !result.ends_with("://")) { + result.remove_suffix(1); + } + return std::string(result); + } +}; + +} // namespace iceberg