Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(

/// \brief A reference to a snapshot, either a branch or a tag.
struct ICEBERG_EXPORT SnapshotRef {
static constexpr std::string_view kMainBranch = "main";

struct ICEBERG_EXPORT Branch {
/// A positive number for the minimum number of snapshots to keep in a branch while
/// expiring snapshots. Defaults to table property
Expand Down
69 changes: 65 additions & 4 deletions src/iceberg/table_requirements.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

#include <memory>

#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Expand All @@ -36,12 +36,73 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableUpdateContext::Build
return std::move(requirements_);
}

void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
if (!added_last_assigned_field_id_) {
if (base_ != nullptr) {
AddRequirement(
std::make_unique<table::AssertLastAssignedFieldId>(base_->last_column_id));
}
added_last_assigned_field_id_ = true;
}
}

void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
if (!added_current_schema_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
base_->current_schema_id.value()));
}
added_current_schema_id_ = true;
}
}

void TableUpdateContext::RequireLastAssignedPartitionIdUnchanged() {
if (!added_last_assigned_partition_id_) {
if (base_ != nullptr) {
AddRequirement(std::make_unique<table::AssertLastAssignedPartitionId>(
base_->last_partition_id));
}
added_last_assigned_partition_id_ = true;
}
}

void TableUpdateContext::RequireDefaultSpecIdUnchanged() {
if (!added_default_spec_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(
std::make_unique<table::AssertDefaultSpecID>(base_->default_spec_id));
}
added_default_spec_id_ = true;
}
}

void TableUpdateContext::RequireDefaultSortOrderIdUnchanged() {
if (!added_default_sort_order_id_) {
if (base_ != nullptr && !is_replace_) {
AddRequirement(std::make_unique<table::AssertDefaultSortOrderID>(
base_->default_sort_order_id));
}
added_default_sort_order_id_ = true;
}
}

void TableUpdateContext::RequireNoBranchesChanged() {
if (base_ != nullptr && !is_replace_) {
for (const auto& [name, ref] : base_->refs) {
if (ref->type() == SnapshotRefType::kBranch && name != SnapshotRef::kMainBranch) {
AddRequirement(
std::make_unique<table::AssertRefSnapshotID>(name, ref->snapshot_id));
}
}
}
}

Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForCreateTable(
const std::vector<std::unique_ptr<TableUpdate>>& table_updates) {
TableUpdateContext context(nullptr, false);
context.AddRequirement(std::make_unique<table::AssertDoesNotExist>());
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand All @@ -52,7 +113,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForRep
TableUpdateContext context(&base, true);
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand All @@ -63,7 +124,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForUpd
TableUpdateContext context(&base, false);
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
for (const auto& update : table_updates) {
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
update->GenerateRequirements(context);
}
return context.Build();
}
Expand Down
34 changes: 13 additions & 21 deletions src/iceberg/table_requirements.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,19 @@ class ICEBERG_EXPORT TableUpdateContext {
/// \brief Build and return the list of requirements
Result<std::vector<std::unique_ptr<TableRequirement>>> Build();

// Getters for deduplication flags
bool added_last_assigned_field_id() const { return added_last_assigned_field_id_; }
bool added_current_schema_id() const { return added_current_schema_id_; }
bool added_last_assigned_partition_id() const {
return added_last_assigned_partition_id_;
}
bool added_default_spec_id() const { return added_default_spec_id_; }
bool added_default_sort_order_id() const { return added_default_sort_order_id_; }

// Setters for deduplication flags
void set_added_last_assigned_field_id(bool value) {
added_last_assigned_field_id_ = value;
}
void set_added_current_schema_id(bool value) { added_current_schema_id_ = value; }
void set_added_last_assigned_partition_id(bool value) {
added_last_assigned_partition_id_ = value;
}
void set_added_default_spec_id(bool value) { added_default_spec_id_ = value; }
void set_added_default_sort_order_id(bool value) {
added_default_sort_order_id_ = value;
}
// Helper methods to deduplicate requirements to add.
/// \brief Require that the last assigned field ID remains unchanged
void RequireLastAssignedFieldIdUnchanged();
/// \brief Require that the current schema ID remains unchanged
void RequireCurrentSchemaIdUnchanged();
/// \brief Require that the last assigned partition ID remains unchanged
void RequireLastAssignedPartitionIdUnchanged();
/// \brief Require that the default spec ID remains unchanged
void RequireDefaultSpecIdUnchanged();
/// \brief Require that the default sort order ID remains unchanged
void RequireDefaultSortOrderIdUnchanged();
/// \brief Require that no branches have been changed
void RequireNoBranchesChanged();

private:
const TableMetadata* base_;
Expand Down
78 changes: 37 additions & 41 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "iceberg/exception.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"

namespace iceberg::table {
Expand All @@ -32,9 +31,8 @@ void AssignUUID::ApplyTo(TableMetadataBuilder& builder) const {
builder.AssignUUID(uuid_);
}

Status AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
void AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
// AssignUUID does not generate additional requirements.
return {};
}

// UpgradeFormatVersion
Expand All @@ -43,8 +41,8 @@ void UpgradeFormatVersion::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("UpgradeFormatVersion::GenerateRequirements not implemented");
void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
// UpgradeFormatVersion doesn't generate any requirements
}

// AddSchema
Expand All @@ -53,8 +51,8 @@ void AddSchema::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddSchema::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSchema::GenerateRequirements not implemented");
void AddSchema::GenerateRequirements(TableUpdateContext& context) const {
context.RequireLastAssignedFieldIdUnchanged();
}

// SetCurrentSchema
Expand All @@ -63,8 +61,8 @@ void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetCurrentTableSchema::GenerateRequirements not implemented");
void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
context.RequireCurrentSchemaIdUnchanged();
}

// AddPartitionSpec
Expand All @@ -73,8 +71,8 @@ void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTablePartitionSpec::GenerateRequirements not implemented");
void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
context.RequireLastAssignedPartitionIdUnchanged();
}

// SetDefaultPartitionSpec
Expand All @@ -83,9 +81,8 @@ void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented(
"SetDefaultTablePartitionSpec::GenerateRequirements not implemented");
void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSpecIdUnchanged();
}

// RemovePartitionSpecs
Expand All @@ -94,9 +91,9 @@ void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented(
"RemoveTablePartitionSpecs::GenerateRequirements not implemented");
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSpecIdUnchanged();
context.RequireNoBranchesChanged();
}

// RemoveSchemas
Expand All @@ -105,28 +102,29 @@ void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSchemas::GenerateRequirements not implemented");
void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
context.RequireCurrentSchemaIdUnchanged();
context.RequireNoBranchesChanged();
}

// AddSortOrder

void AddSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.AddSortOrder(sort_order_);
}

Status AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSortOrder::GenerateRequirements not implemented");
void AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
// AddSortOrder doesn't generate any requirements
}

// SetDefaultSortOrder

void SetDefaultSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
builder.SetDefaultSortOrder(sort_order_id_);
}

Status SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetDefaultTableSortOrder::GenerateRequirements not implemented");
void SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
context.RequireDefaultSortOrderIdUnchanged();
}

// AddSnapshot
Expand All @@ -135,16 +133,16 @@ void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("AddTableSnapshot::GenerateRequirements not implemented");
void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
// AddSnapshot doesn't generate any requirements
}

// RemoveSnapshots

void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}

Status RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSnapshots::GenerateRequirements not implemented");
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
throw IcebergError("RemoveTableSnapshots::GenerateRequirements not implemented");
}

// RemoveSnapshotRef
Expand All @@ -153,8 +151,8 @@ void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("RemoveTableSnapshotRef::GenerateRequirements not implemented");
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
// RemoveSnapshotRef doesn't generate any requirements
}

// SetSnapshotRef
Expand All @@ -163,8 +161,8 @@ void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableSnapshotRef::GenerateRequirements not implemented");
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
throw NotImplemented("SetTableSnapshotRef::GenerateRequirements not implemented");
}

// SetProperties
Expand All @@ -173,9 +171,8 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetProperties(updated_);
}

Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
// No requirements
return {};
void SetProperties::GenerateRequirements(TableUpdateContext& context) const {
// SetProperties doesn't generate any requirements
}

// RemoveProperties
Expand All @@ -184,9 +181,8 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemoveProperties(removed_);
}

Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
// No requirements
return {};
void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
// RemoveProperties doesn't generate any requirements
}

// SetLocation
Expand All @@ -195,8 +191,8 @@ void SetLocation::ApplyTo(TableMetadataBuilder& builder) const {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
}

Status SetLocation::GenerateRequirements(TableUpdateContext& context) const {
return NotImplemented("SetTableLocation::GenerateRequirements not implemented");
void SetLocation::GenerateRequirements(TableUpdateContext& context) const {
// SetLocation doesn't generate any requirements
}

} // namespace iceberg::table
Loading
Loading