Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ set(ICEBERG_SOURCES
manifest/v2_metadata.cc
manifest/v3_metadata.cc
metadata_columns.cc
metrics_config.cc
name_mapping.cc
partition_field.cc
partition_spec.cc
Expand All @@ -69,6 +70,7 @@ set(ICEBERG_SOURCES
transform.cc
transform_function.cc
type.cc
update/update_properties.cc
util/bucket_util.cc
util/conversions.cc
util/decimal.cc
Expand Down Expand Up @@ -134,6 +136,7 @@ add_subdirectory(catalog)
add_subdirectory(expression)
add_subdirectory(manifest)
add_subdirectory(row)
add_subdirectory(update)
add_subdirectory(util)

if(ICEBERG_BUILD_BUNDLE)
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ iceberg_sources = files(
'manifest/v2_metadata.cc',
'manifest/v3_metadata.cc',
'metadata_columns.cc',
'metrics_config.cc',
'name_mapping.cc',
'partition_field.cc',
'partition_spec.cc',
Expand All @@ -91,6 +92,7 @@ iceberg_sources = files(
'transform.cc',
'transform_function.cc',
'type.cc',
'update/update_properties.cc',
'util/bucket_util.cc',
'util/conversions.cc',
'util/decimal.cc',
Expand Down Expand Up @@ -164,9 +166,11 @@ install_headers(
'location_provider.h',
'metadata_columns.h',
'metrics.h',
'metrics_config.h',
'name_mapping.h',
'partition_field.h',
'partition_spec.h',
'pending_update.h',
'result.h',
'schema_field.h',
'schema.h',
Expand All @@ -187,6 +191,7 @@ install_headers(
'transform.h',
'type_fwd.h',
'type.h',
'update/update_properties.h',
],
subdir: 'iceberg',
)
Expand Down
50 changes: 50 additions & 0 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/metrics_config.h"

#include <string>
#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/table_properties.h"

namespace iceberg {

Status MetricsConfig::VerifyReferencedColumns(
const std::unordered_map<std::string, std::string>& updates, const Schema& schema) {
for (const auto& [key, value] : updates) {
if (!key.starts_with(TableProperties::kMetricModeColumnConfPrefix)) {
continue;
}
auto field_name =
std::string_view(key).substr(TableProperties::kMetricModeColumnConfPrefix.size());
auto field = schema.FindFieldByName(field_name);
if (!field.has_value() || !field.value().has_value()) {
return ValidationFailed(
"Invalid metrics config, could not find column {} from table prop {} in "
"schema {}",
field_name, key, schema.ToString());
}
}
return {};
}

} // namespace iceberg
45 changes: 45 additions & 0 deletions src/iceberg/metrics_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/metrics_config.h
/// \brief Metrics configuration for Iceberg tables

#include <string>
#include <unordered_map>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Configuration utilities for table metrics
class ICEBERG_EXPORT MetricsConfig {
public:
/// \brief Verify that all referenced columns are valid
/// \param updates The updates to verify
/// \param schema The schema to verify against
/// \return OK if all referenced columns are valid
static Status VerifyReferencedColumns(
const std::unordered_map<std::string, std::string>& updates, const Schema& schema);
};

} // namespace iceberg
37 changes: 7 additions & 30 deletions src/iceberg/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ namespace iceberg {
///
/// This matches the Java Iceberg pattern where BaseTransaction stores a
/// List<PendingUpdate> without type parameters.
class ICEBERG_EXPORT PendingUpdate {
class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
public:
virtual ~PendingUpdate() = default;

/// \brief Verify that the changes are valid and apply them.
/// \return Status::OK if the changes are valid, or an error:
/// - ValidationFailed: if pending changes cannot be applied
/// - InvalidArgument: if pending changes are conflicting
virtual Status Apply() = 0;

/// \brief Apply and commit the pending changes to the table
///
/// Changes are committed by calling the underlying table's commit operation.
Expand All @@ -63,33 +69,4 @@ class ICEBERG_EXPORT PendingUpdate {
PendingUpdate() = default;
};

/// \brief Template class for type-safe table metadata changes using builder pattern
///
/// PendingUpdateTyped extends PendingUpdate with a type-safe Apply() method that
/// returns the specific result type for each operation. Subclasses implement
/// specific types of table updates such as schema changes, property updates, or
/// snapshot-producing operations like appends and deletes.
///
/// Apply() can be used to validate and inspect the uncommitted changes before
/// committing. Commit() applies the changes and commits them to the table.
///
/// \tparam T The type of result returned by Apply()
template <typename T>
class ICEBERG_EXPORT PendingUpdateTyped : public PendingUpdate, public ErrorCollector {
public:
~PendingUpdateTyped() override = default;

/// \brief Apply the pending changes and return the uncommitted result
///
/// This does not result in a permanent update.
///
/// \return the uncommitted changes that would be committed, or an error:
/// - ValidationFailed: if pending changes cannot be applied
/// - InvalidArgument: if pending changes are conflicting
virtual Result<T> Apply() = 0;

protected:
PendingUpdateTyped() = default;
};

} // namespace iceberg
7 changes: 5 additions & 2 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

#include "iceberg/table.h"

#include <algorithm>

#include "iceberg/catalog.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_scan.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -110,6 +109,10 @@ const std::vector<SnapshotLogEntry>& Table::history() const {
return metadata_->snapshot_log;
}

std::unique_ptr<UpdateProperties> Table::UpdateProperties() const {
return std::make_unique<iceberg::UpdateProperties>(identifier_, catalog_, metadata_);
}

std::unique_ptr<Transaction> Table::NewTransaction() const {
throw NotImplemented("Table::NewTransaction is not implemented");
}
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ class ICEBERG_EXPORT Table {
/// \return a vector of history entries
const std::vector<SnapshotLogEntry>& history() const;

/// \brief Create a new UpdateProperties to update table properties and commit the
/// changes
///
/// \return a new UpdateProperties instance
virtual std::unique_ptr<iceberg::UpdateProperties> UpdateProperties() const;

/// \brief Create a new table scan builder for this table
///
/// Once a table scan builder is created, it can be refined to project columns and
Expand Down
60 changes: 57 additions & 3 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,35 @@ TableMetadataBuilder& TableMetadataBuilder::AssignUUID(std::string_view uuid) {

TableMetadataBuilder& TableMetadataBuilder::UpgradeFormatVersion(
int8_t new_format_version) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
// Check that the new format version is supported
if (new_format_version > TableMetadata::kSupportedTableFormatVersion) {
return AddError(
ErrorKind::kInvalidArgument,
std::format(
"Cannot upgrade table to unsupported format version: v{} (supported: v{})",
new_format_version, TableMetadata::kSupportedTableFormatVersion));
}

// Check that we're not downgrading
if (new_format_version < impl_->metadata.format_version) {
return AddError(ErrorKind::kInvalidArgument,
std::format("Cannot downgrade v{} table to v{}",
impl_->metadata.format_version, new_format_version));
}

// No-op if the version is the same
if (new_format_version == impl_->metadata.format_version) {
return *this;
}

// Update the format version
impl_->metadata.format_version = new_format_version;

// Record the change
impl_->changes.push_back(
std::make_unique<table::UpgradeFormatVersion>(new_format_version));

return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(
Expand Down Expand Up @@ -472,12 +500,38 @@ TableMetadataBuilder& TableMetadataBuilder::RemovePartitionStatistics(

TableMetadataBuilder& TableMetadataBuilder::SetProperties(
const std::unordered_map<std::string, std::string>& updated) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
// If updated is empty, return early (no-op)
if (updated.empty()) {
return *this;
}

// Add all updated properties to the metadata properties
for (const auto& [key, value] : updated) {
impl_->metadata.properties[key] = value;
}

// Record the change
impl_->changes.push_back(std::make_unique<table::SetProperties>(updated));

return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
const std::vector<std::string>& removed) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
// If removed is empty, return early (no-op)
if (removed.empty()) {
return *this;
}

// Remove each property from the metadata properties
for (const auto& key : removed) {
impl_->metadata.properties.erase(key);
}

// Record the change
impl_->changes.push_back(std::make_unique<table::RemoveProperties>(removed));

return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase<TableProperties> {
"write.metadata.delete-after-commit.enabled", false};
inline static Entry<int32_t> kMetricsMaxInferredColumnDefaults{
"write.metadata.metrics.max-inferred-column-defaults", 100};
inline static constexpr std::string_view kMetricModeColumnConfPrefix =
"write.metadata.metrics.column.";
inline static Entry<std::string> kDefaultWriteMetricsMode{
"write.metadata.metrics.default", "truncate(16)"};

Expand Down
1 change: 1 addition & 0 deletions src/iceberg/table_requirements.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Expand Down
10 changes: 6 additions & 4 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,23 @@ Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
// SetProperties

void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetProperties(updated_);
}

Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableProperties::GenerateRequirements not implemented");
// No requirements
return {};
}

// RemoveProperties

void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.RemoveProperties(removed_);
}

Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableProperties::GenerateRequirements not implemented");
// No requirements
return {};
}

// SetLocation
Expand Down
4 changes: 3 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ add_iceberg_test(schema_test
add_iceberg_test(table_test
SOURCES
json_internal_test.cc
metrics_config_test.cc
schema_json_test.cc
table_test.cc
table_metadata_builder_test.cc
table_requirement_test.cc
table_requirements_test.cc
table_update_test.cc)
table_update_test.cc
update_properties_test.cc)

add_iceberg_test(expression_test
SOURCES
Expand Down
Loading
Loading