diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index d5429808c..c102f073d 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -43,6 +43,7 @@ set(ICEBERG_SOURCES manifest/v2_metadata.cc manifest/v3_metadata.cc metadata_columns.cc + metrics_config.cc name_mapping.cc partition_field.cc partition_spec.cc @@ -69,6 +70,7 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + update/update_properties.cc util/bucket_util.cc util/conversions.cc util/decimal.cc @@ -134,6 +136,7 @@ add_subdirectory(catalog) add_subdirectory(expression) add_subdirectory(manifest) add_subdirectory(row) +add_subdirectory(update) add_subdirectory(util) if(ICEBERG_BUILD_BUNDLE) diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d52739be9..2e16ce59d 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -65,6 +65,7 @@ iceberg_sources = files( 'manifest/v2_metadata.cc', 'manifest/v3_metadata.cc', 'metadata_columns.cc', + 'metrics_config.cc', 'name_mapping.cc', 'partition_field.cc', 'partition_spec.cc', @@ -91,6 +92,7 @@ iceberg_sources = files( 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/update_properties.cc', 'util/bucket_util.cc', 'util/conversions.cc', 'util/decimal.cc', @@ -164,9 +166,11 @@ install_headers( 'location_provider.h', 'metadata_columns.h', 'metrics.h', + 'metrics_config.h', 'name_mapping.h', 'partition_field.h', 'partition_spec.h', + 'pending_update.h', 'result.h', 'schema_field.h', 'schema.h', @@ -187,6 +191,7 @@ install_headers( 'transform.h', 'type_fwd.h', 'type.h', + 'update/update_properties.h', ], subdir: 'iceberg', ) diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc new file mode 100644 index 000000000..499a425d6 --- /dev/null +++ b/src/iceberg/metrics_config.cc @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_config.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/table_properties.h" + +namespace iceberg { + +Status MetricsConfig::VerifyReferencedColumns( + const std::unordered_map& updates, const Schema& schema) { + for (const auto& [key, value] : updates) { + if (!key.starts_with(TableProperties::kMetricModeColumnConfPrefix)) { + continue; + } + auto field_name = + std::string_view(key).substr(TableProperties::kMetricModeColumnConfPrefix.size()); + auto field = schema.FindFieldByName(field_name); + if (!field.has_value() || !field.value().has_value()) { + return ValidationFailed( + "Invalid metrics config, could not find column {} from table prop {} in " + "schema {}", + field_name, key, schema.ToString()); + } + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h new file mode 100644 index 000000000..c42539d64 --- /dev/null +++ b/src/iceberg/metrics_config.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/metrics_config.h +/// \brief Metrics configuration for Iceberg tables + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Configuration utilities for table metrics +class ICEBERG_EXPORT MetricsConfig { + public: + /// \brief Verify that all referenced columns are valid + /// \param updates The updates to verify + /// \param schema The schema to verify against + /// \return OK if all referenced columns are valid + static Status VerifyReferencedColumns( + const std::unordered_map& updates, const Schema& schema); +}; + +} // namespace iceberg diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h index 8c1f7f270..8370db141 100644 --- a/src/iceberg/pending_update.h +++ b/src/iceberg/pending_update.h @@ -37,10 +37,16 @@ namespace iceberg { /// /// This matches the Java Iceberg pattern where BaseTransaction stores a /// List without type parameters. -class ICEBERG_EXPORT PendingUpdate { +class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: virtual ~PendingUpdate() = default; + /// \brief Verify that the changes are valid and apply them. + /// \return Status::OK if the changes are valid, or an error: + /// - ValidationFailed: if pending changes cannot be applied + /// - InvalidArgument: if pending changes are conflicting + virtual Status Apply() = 0; + /// \brief Apply and commit the pending changes to the table /// /// Changes are committed by calling the underlying table's commit operation. @@ -63,33 +69,4 @@ class ICEBERG_EXPORT PendingUpdate { PendingUpdate() = default; }; -/// \brief Template class for type-safe table metadata changes using builder pattern -/// -/// PendingUpdateTyped extends PendingUpdate with a type-safe Apply() method that -/// returns the specific result type for each operation. Subclasses implement -/// specific types of table updates such as schema changes, property updates, or -/// snapshot-producing operations like appends and deletes. -/// -/// Apply() can be used to validate and inspect the uncommitted changes before -/// committing. Commit() applies the changes and commits them to the table. -/// -/// \tparam T The type of result returned by Apply() -template -class ICEBERG_EXPORT PendingUpdateTyped : public PendingUpdate, public ErrorCollector { - public: - ~PendingUpdateTyped() override = default; - - /// \brief Apply the pending changes and return the uncommitted result - /// - /// This does not result in a permanent update. - /// - /// \return the uncommitted changes that would be committed, or an error: - /// - ValidationFailed: if pending changes cannot be applied - /// - InvalidArgument: if pending changes are conflicting - virtual Result Apply() = 0; - - protected: - PendingUpdateTyped() = default; -}; - } // namespace iceberg diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 9116a5e92..458711255 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -19,8 +19,6 @@ #include "iceberg/table.h" -#include - #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -28,6 +26,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" +#include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -110,6 +109,10 @@ const std::vector& Table::history() const { return metadata_->snapshot_log; } +std::unique_ptr Table::UpdateProperties() const { + return std::make_unique(identifier_, catalog_, metadata_); +} + std::unique_ptr Table::NewTransaction() const { throw NotImplemented("Table::NewTransaction is not implemented"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 1ea5ce873..df3a0c32e 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -104,6 +104,12 @@ class ICEBERG_EXPORT Table { /// \return a vector of history entries const std::vector& history() const; + /// \brief Create a new UpdateProperties to update table properties and commit the + /// changes + /// + /// \return a new UpdateProperties instance + virtual std::unique_ptr UpdateProperties() const; + /// \brief Create a new table scan builder for this table /// /// Once a table scan builder is created, it can be refined to project columns and diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 393eebfc3..780ab61da 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -364,7 +364,35 @@ TableMetadataBuilder& TableMetadataBuilder::AssignUUID(std::string_view uuid) { TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion( int8_t new_format_version) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + // Check that the new format version is supported + if (new_format_version > TableMetadata::kSupportedTableFormatVersion) { + return AddError( + ErrorKind::kInvalidArgument, + std::format( + "Cannot upgrade table to unsupported format version: v{} (supported: v{})", + new_format_version, TableMetadata::kSupportedTableFormatVersion)); + } + + // Check that we're not downgrading + if (new_format_version < impl_->metadata.format_version) { + return AddError(ErrorKind::kInvalidArgument, + std::format("Cannot downgrade v{} table to v{}", + impl_->metadata.format_version, new_format_version)); + } + + // No-op if the version is the same + if (new_format_version == impl_->metadata.format_version) { + return *this; + } + + // Update the format version + impl_->metadata.format_version = new_format_version; + + // Record the change + impl_->changes.push_back( + std::make_unique(new_format_version)); + + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema( @@ -472,12 +500,38 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics( TableMetadataBuilder& TableMetadataBuilder::SetProperties( const std::unordered_map& updated) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + // If updated is empty, return early (no-op) + if (updated.empty()) { + return *this; + } + + // Add all updated properties to the metadata properties + for (const auto& [key, value] : updated) { + impl_->metadata.properties[key] = value; + } + + // Record the change + impl_->changes.push_back(std::make_unique(updated)); + + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveProperties( const std::vector& removed) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + // If removed is empty, return early (no-op) + if (removed.empty()) { + return *this; + } + + // Remove each property from the metadata properties + for (const auto& key : removed) { + impl_->metadata.properties.erase(key); + } + + // Record the change + impl_->changes.push_back(std::make_unique(removed)); + + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) { diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index 31493cfd8..f14ddfb5b 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -231,6 +231,8 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { "write.metadata.delete-after-commit.enabled", false}; inline static Entry kMetricsMaxInferredColumnDefaults{ "write.metadata.metrics.max-inferred-column-defaults", 100}; + inline static constexpr std::string_view kMetricModeColumnConfPrefix = + "write.metadata.metrics.column."; inline static Entry kDefaultWriteMetricsMode{ "write.metadata.metrics.default", "truncate(16)"}; diff --git a/src/iceberg/table_requirements.cc b/src/iceberg/table_requirements.cc index c9d34c293..3e0aa024f 100644 --- a/src/iceberg/table_requirements.cc +++ b/src/iceberg/table_requirements.cc @@ -24,6 +24,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" #include "iceberg/table_update.h" +#include "iceberg/util/macros.h" namespace iceberg { diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 71fa30ee0..6c1ad72e4 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -170,21 +170,23 @@ Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const { // SetProperties void SetProperties::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.SetProperties(updated_); } Status SetProperties::GenerateRequirements(TableUpdateContext& context) const { - return NotImplemented("SetTableProperties::GenerateRequirements not implemented"); + // No requirements + return {}; } // RemoveProperties void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.RemoveProperties(removed_); } Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const { - return NotImplemented("RemoveTableProperties::GenerateRequirements not implemented"); + // No requirements + return {}; } // SetLocation diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1a7e61acc..ee192f8a8 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -81,12 +81,14 @@ add_iceberg_test(schema_test add_iceberg_test(table_test SOURCES json_internal_test.cc + metrics_config_test.cc schema_json_test.cc table_test.cc table_metadata_builder_test.cc table_requirement_test.cc table_requirements_test.cc - table_update_test.cc) + table_update_test.cc + update_properties_test.cc) add_iceberg_test(expression_test SOURCES diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index a6a232388..0ae530028 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -47,11 +47,13 @@ iceberg_tests = { 'table_test': { 'sources': files( 'json_internal_test.cc', + 'metrics_config_test.cc', 'schema_json_test.cc', 'table_metadata_builder_test.cc', 'table_requirement_test.cc', 'table_test.cc', 'table_update_test.cc', + 'update_properties_test.cc', ), }, 'expression_test': { diff --git a/src/iceberg/test/metrics_config_test.cc b/src/iceberg/test/metrics_config_test.cc new file mode 100644 index 000000000..e6a8e0f51 --- /dev/null +++ b/src/iceberg/test/metrics_config_test.cc @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/metrics_config.h" + +#include +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/table_properties.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +class MetricsConfigTest : public ::testing::Test { + protected: + void SetUp() override { + SchemaField field1(1, "col1", std::make_shared(), false); + SchemaField field2(2, "col2", std::make_shared(), true); + SchemaField field3(3, "col3", std::make_shared(), false); + schema_ = + std::make_unique(std::vector{field1, field2, field3}, 100); + } + + std::unique_ptr schema_; +}; + +TEST_F(MetricsConfigTest, ValidateColumnReferences) { + { + // Empty updates should be valid + std::unordered_map updates; + + auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_); + EXPECT_THAT(result, IsOk()) << "Validation should pass for empty updates"; + } + + { + // No column references + std::unordered_map updates; + updates["write.format.default"] = "parquet"; + updates["write.target-file-size-bytes"] = "524288000"; + + auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_); + EXPECT_THAT(result, IsOk()) + << "Validation should pass when no column references exist"; + } + + { + // Valid column reference + std::unordered_map updates; + updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + "col1"] = + "counts"; + updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + "col2"] = "full"; + updates["some.other.property"] = "value"; + + auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_); + EXPECT_THAT(result, IsOk()) << "Validation should pass for valid column references"; + } + + { + // Invalid column reference + std::unordered_map updates; + updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + "nonexistent"] = + "counts"; + + auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)) + << "Validation should fail for invalid column references"; + } + + { + // Mixed valid and invalid column references + std::unordered_map updates; + updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + "col1"] = + "counts"; + updates[std::string(TableProperties::kMetricModeColumnConfPrefix) + "nonexistent"] = + "full"; + + auto result = MetricsConfig::VerifyReferencedColumns(updates, *schema_); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)) + << "Validation should fail when any column reference is invalid"; + } +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_metadata_builder_test.cc b/src/iceberg/test/table_metadata_builder_test.cc index f49a60dae..ff41ae18c 100644 --- a/src/iceberg/test/table_metadata_builder_test.cc +++ b/src/iceberg/test/table_metadata_builder_test.cc @@ -124,6 +124,56 @@ TEST(TableMetadataBuilderTest, AssignUUID) { EXPECT_EQ(metadata->table_uuid, "TEST-UUID-ABCD"); // Original case preserved } +TEST(TableMetadataBuilderTest, SetProperties) { + auto builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}}); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(metadata->properties.size(), 2); + EXPECT_EQ(metadata->properties["key1"], "value1"); + EXPECT_EQ(metadata->properties["key2"], "value2"); + + // Update existing property and add new one + builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->SetProperties({{"key1", "value1"}}); + builder->SetProperties({{"key1", "new_value1"}, {"key3", "value3"}}); + + ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build()); + EXPECT_EQ(metadata->properties.size(), 2); + EXPECT_EQ(metadata->properties["key1"], "new_value1"); + EXPECT_EQ(metadata->properties["key3"], "value3"); +} + +TEST(TableMetadataBuilderTest, RemoveProperties) { + auto builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}}); + builder->RemoveProperties({"key2", "key4"}); // key4 does not exist + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(metadata->properties.size(), 2); + EXPECT_EQ(metadata->properties["key1"], "value1"); + EXPECT_EQ(metadata->properties["key3"], "value3"); +} + +TEST(TableMetadataBuilderTest, UpgradeFormatVersion) { + auto builder = TableMetadataBuilder::BuildFromEmpty(1); + builder->UpgradeFormatVersion(2); + + ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build()); + EXPECT_EQ(metadata->format_version, 2); + + // Unsupported format version should fail + builder = TableMetadataBuilder::BuildFromEmpty(3); + builder->UpgradeFormatVersion(4); + EXPECT_THAT(builder->Build(), + HasErrorMessage("Cannot upgrade table to unsupported format version")); + + // Downgrade should fail + builder = TableMetadataBuilder::BuildFromEmpty(2); + builder->UpgradeFormatVersion(1); + EXPECT_THAT(builder->Build(), HasErrorMessage("Cannot downgrade")); +} + // Test applying TableUpdate to builder TEST(TableMetadataBuilderTest, ApplyUpdate) { // Apply AssignUUID update diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc new file mode 100644 index 000000000..13cfec831 --- /dev/null +++ b/src/iceberg/test/update_properties_test.cc @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_properties.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/file_format.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/mock_catalog.h" + +namespace iceberg { + +class UpdatePropertiesTest : public ::testing::Test { + protected: + void SetUp() override { + // Create a simple schema + SchemaField f(1, "col1", std::make_shared(), false); + schema_ = std::make_shared(std::vector{f}, 1); + + // Create basic table metadata + metadata_ = std::make_shared(); + metadata_->schemas.push_back(schema_); + + // Create catalog and table identifier + catalog_ = std::make_shared(); + identifier_ = TableIdentifier(Namespace({"test"}), "table"); + } + + std::shared_ptr schema_; + std::shared_ptr metadata_; + std::shared_ptr catalog_; + TableIdentifier identifier_; +}; + +TEST_F(UpdatePropertiesTest, EmptyUpdates) { + UpdateProperties update(identifier_, catalog_, metadata_); + + auto result = update.Commit(); + EXPECT_THAT(result, IsOk()); +} + +TEST_F(UpdatePropertiesTest, SetProperty) { + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("key1", "value1").Set("key2", "value2"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); +} + +TEST_F(UpdatePropertiesTest, RemoveProperty) { + UpdateProperties update(identifier_, catalog_, metadata_); + update.Remove("key1").Remove("key2"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); +} + +TEST_F(UpdatePropertiesTest, SetRemoveConflict) { + { + // Set a property that is already marked for removal + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("key1", "value1").Remove("key1"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("already marked for update")); + } + + { + // Remove a property that is already marked for update + UpdateProperties update(identifier_, catalog_, metadata_); + update.Remove("key1").Set("key1", "value1"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("already marked for removal")); + } +} + +TEST_F(UpdatePropertiesTest, UpgradeFormatVersion) { + { + // Valid format-version upgrade + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("format-version", "2"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); + } + + { + // Format-version is not a valid integer + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("format-version", "invalid"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Invalid format version")); + } + + { + // Format-version is out of range + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("format-version", "5000000000"); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("out of range")); + } + + { + // Format-version not supported + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("format-version", + std::to_string(TableMetadata::kSupportedTableFormatVersion + 1)); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("unsupported format version")); + } +} + +TEST_F(UpdatePropertiesTest, InvalidTable) { + { + // catalog is null + UpdateProperties update(identifier_, nullptr, metadata_); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Catalog is required")); + } + + { + // metadata is null + UpdateProperties update(identifier_, catalog_, nullptr); + + auto result = update.Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Base table metadata is required")); + } +} + +TEST_F(UpdatePropertiesTest, Commit) { + { + // Successful commit + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("key1", "value1"); + + EXPECT_CALL(*catalog_, UpdateTable).Times(1).WillOnce(::testing::Return(nullptr)); + + auto result = update.Commit(); + EXPECT_THAT(result, IsOk()); + } + + { + // Failed commit + UpdateProperties update(identifier_, catalog_, metadata_); + update.Set("key1", "value1"); + + EXPECT_CALL(*catalog_, UpdateTable) + .WillOnce(::testing::Return(CommitFailed("Commit update failed"))); + auto result = update.Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + } +} + +TEST_F(UpdatePropertiesTest, FluentInterface) { + UpdateProperties update(identifier_, catalog_, metadata_); + + auto& ref = update.Set("key1", "value1").Remove("key2"); + + // Should return reference to itself + EXPECT_EQ(&ref, &update); + + auto result = update.Apply(); + EXPECT_THAT(result, IsOk()); +} + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 68a4543fb..76de22a1e 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -94,7 +94,6 @@ class SortField; class SortOrder; class Table; class TableProperties; -class FileIO; class Transaction; class Transform; class TransformFunction; @@ -160,6 +159,7 @@ class TableUpdateContext; class PendingUpdate; template class PendingUpdateTyped; +class UpdateProperties; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/CMakeLists.txt b/src/iceberg/update/CMakeLists.txt new file mode 100644 index 000000000..d73910968 --- /dev/null +++ b/src/iceberg/update/CMakeLists.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +iceberg_install_all_headers(iceberg/update) diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc new file mode 100644 index 000000000..a4dcd1548 --- /dev/null +++ b/src/iceberg/update/update_properties.cc @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_properties.h" + +#include +#include + +#include "iceberg/catalog.h" +#include "iceberg/metrics_config.h" +#include "iceberg/result.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_properties.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +UpdateProperties::UpdateProperties(TableIdentifier identifier, + std::shared_ptr catalog, + std::shared_ptr base) + : identifier_(std::move(identifier)), + catalog_(std::move(catalog)), + base_metadata_(std::move(base)) {} + +UpdateProperties& UpdateProperties::Set(const std::string& key, + const std::string& value) { + if (removals_.contains(key)) { + return AddError( + ErrorKind::kInvalidArgument, + std::format("Cannot set property '{}' that is already marked for removal", key)); + } + + if (!TableProperties::reserved_properties().contains(key) || + key == TableProperties::kFormatVersion.key()) { + updates_.emplace(key, value); + } + + return *this; +} + +UpdateProperties& UpdateProperties::Remove(const std::string& key) { + if (updates_.contains(key)) { + return AddError( + ErrorKind::kInvalidArgument, + std::format("Cannot remove property '{}' that is already marked for update", + key)); + } + + removals_.insert(key); + return *this; +} + +Status UpdateProperties::Apply() { + if (!catalog_) { + return InvalidArgument("Catalog is required to apply property updates"); + } + if (!base_metadata_) { + return InvalidArgument("Base table metadata is required to apply property updates"); + } + + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + auto iter = updates_.find(TableProperties::kFormatVersion.key()); + if (iter != updates_.end()) { + try { + int parsed_version = std::stoi(iter->second); + if (parsed_version > TableMetadata::kSupportedTableFormatVersion) { + return InvalidArgument( + "Cannot upgrade table to unsupported format version: v{} (supported: v{})", + parsed_version, TableMetadata::kSupportedTableFormatVersion); + } + format_version_ = static_cast(parsed_version); + } catch (const std::invalid_argument& e) { + return InvalidArgument("Invalid format version '{}': not a valid integer", + iter->second); + } catch (const std::out_of_range& e) { + return InvalidArgument("Format version '{}' is out of range", iter->second); + } + + updates_.erase(iter); + } + + if (auto schema = base_metadata_->Schema(); schema.has_value()) { + ICEBERG_RETURN_UNEXPECTED( + MetricsConfig::VerifyReferencedColumns(updates_, *schema.value())); + } + return {}; +} + +Status UpdateProperties::Commit() { + ICEBERG_RETURN_UNEXPECTED(Apply()); + + std::vector> updates; + if (!updates_.empty()) { + updates.emplace_back(std::make_unique(std::move(updates_))); + } + if (!removals_.empty()) { + updates.emplace_back(std::make_unique( + std::vector{removals_.begin(), removals_.end()})); + } + if (format_version_.has_value()) { + updates.emplace_back( + std::make_unique(format_version_.value())); + }; + + if (!updates.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto requirements, + TableRequirements::ForUpdateTable(*base_metadata_, updates)); + ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements, updates)); + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h new file mode 100644 index 000000000..0f1adf76a --- /dev/null +++ b/src/iceberg/update/update_properties.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/pending_update.h" +#include "iceberg/table_identifier.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Updates table properties. +class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { + public: + /// \brief Constructs a UpdateProperties for the specified table. + /// + /// \param identifier The table identifier + /// \param catalog The catalog containing the table + /// \param metadata The current table metadata + UpdateProperties(TableIdentifier identifier, std::shared_ptr catalog, + std::shared_ptr base); + + /// \brief Sets a property key to a specified value. + /// + /// The key can not be marked for previous removal and reserved property keys will be + /// ignored. + UpdateProperties& Set(const std::string& key, const std::string& value); + + /// \brief Marks a property for removal. + /// + /// The key can not be already marked for removal. + /// + /// \param key The property key to remove + /// \return Reference to this UpdateProperties for chaining + UpdateProperties& Remove(const std::string& key); + + /// \brief Applies the property changes without committing them. + /// + /// Validates the pending property changes but does not commit them to the table. + /// This method can be used to validate changes before actually committing them. + /// + /// \return Status::OK if the changes are valid, or an error if validation fails + Status Apply() override; + + /// \brief Commits the property changes to the table. + /// + /// Validates the changes and applies them to the table through the catalog. + /// + /// \return OK if the changes are valid and committed successfully, or an error + Status Commit() override; + + private: + TableIdentifier identifier_; + std::shared_ptr catalog_; + std::shared_ptr base_metadata_; + + std::unordered_map updates_; + std::unordered_set removals_; + std::optional format_version_; +}; + +} // namespace iceberg