Skip to content

Commit 2f0955e

Browse files
authored
feat: implement table requirement context (#385)
1 parent b1901e7 commit 2f0955e

File tree

8 files changed

+1308
-104
lines changed

8 files changed

+1308
-104
lines changed

src/iceberg/snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ ICEBERG_EXPORT constexpr Result<SnapshotRefType> SnapshotRefTypeFromString(
6262

6363
/// \brief A reference to a snapshot, either a branch or a tag.
6464
struct ICEBERG_EXPORT SnapshotRef {
65+
static constexpr std::string_view kMainBranch = "main";
66+
6567
struct ICEBERG_EXPORT Branch {
6668
/// A positive number for the minimum number of snapshots to keep in a branch while
6769
/// expiring snapshots. Defaults to table property

src/iceberg/table_requirements.cc

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
#include <memory>
2323

24+
#include "iceberg/snapshot.h"
2425
#include "iceberg/table_metadata.h"
2526
#include "iceberg/table_requirement.h"
2627
#include "iceberg/table_update.h"
27-
#include "iceberg/util/macros.h"
2828

2929
namespace iceberg {
3030

@@ -36,12 +36,78 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableUpdateContext::Build
3636
return std::move(requirements_);
3737
}
3838

39+
void TableUpdateContext::RequireLastAssignedFieldIdUnchanged() {
40+
if (!added_last_assigned_field_id_) {
41+
if (base_ != nullptr) {
42+
AddRequirement(
43+
std::make_unique<table::AssertLastAssignedFieldId>(base_->last_column_id));
44+
}
45+
added_last_assigned_field_id_ = true;
46+
}
47+
}
48+
49+
void TableUpdateContext::RequireCurrentSchemaIdUnchanged() {
50+
if (!added_current_schema_id_) {
51+
if (base_ != nullptr && !is_replace_) {
52+
AddRequirement(std::make_unique<table::AssertCurrentSchemaID>(
53+
base_->current_schema_id.value()));
54+
}
55+
added_current_schema_id_ = true;
56+
}
57+
}
58+
59+
void TableUpdateContext::RequireLastAssignedPartitionIdUnchanged() {
60+
if (!added_last_assigned_partition_id_) {
61+
if (base_ != nullptr) {
62+
AddRequirement(std::make_unique<table::AssertLastAssignedPartitionId>(
63+
base_->last_partition_id));
64+
}
65+
added_last_assigned_partition_id_ = true;
66+
}
67+
}
68+
69+
void TableUpdateContext::RequireDefaultSpecIdUnchanged() {
70+
if (!added_default_spec_id_) {
71+
if (base_ != nullptr && !is_replace_) {
72+
AddRequirement(
73+
std::make_unique<table::AssertDefaultSpecID>(base_->default_spec_id));
74+
}
75+
added_default_spec_id_ = true;
76+
}
77+
}
78+
79+
void TableUpdateContext::RequireDefaultSortOrderIdUnchanged() {
80+
if (!added_default_sort_order_id_) {
81+
if (base_ != nullptr && !is_replace_) {
82+
AddRequirement(std::make_unique<table::AssertDefaultSortOrderID>(
83+
base_->default_sort_order_id));
84+
}
85+
added_default_sort_order_id_ = true;
86+
}
87+
}
88+
89+
void TableUpdateContext::RequireNoBranchesChanged() {
90+
if (base_ != nullptr && !is_replace_) {
91+
for (const auto& [name, ref] : base_->refs) {
92+
if (ref->type() == SnapshotRefType::kBranch && name != SnapshotRef::kMainBranch) {
93+
AddRequirement(
94+
std::make_unique<table::AssertRefSnapshotID>(name, ref->snapshot_id));
95+
}
96+
}
97+
}
98+
}
99+
100+
bool TableUpdateContext::AddChangedRef(const std::string& ref_name) {
101+
auto [_, inserted] = changed_refs_.insert(ref_name);
102+
return inserted;
103+
}
104+
39105
Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForCreateTable(
40106
const std::vector<std::unique_ptr<TableUpdate>>& table_updates) {
41107
TableUpdateContext context(nullptr, false);
42108
context.AddRequirement(std::make_unique<table::AssertDoesNotExist>());
43109
for (const auto& update : table_updates) {
44-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
110+
update->GenerateRequirements(context);
45111
}
46112
return context.Build();
47113
}
@@ -52,7 +118,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForRep
52118
TableUpdateContext context(&base, true);
53119
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
54120
for (const auto& update : table_updates) {
55-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
121+
update->GenerateRequirements(context);
56122
}
57123
return context.Build();
58124
}
@@ -63,7 +129,7 @@ Result<std::vector<std::unique_ptr<TableRequirement>>> TableRequirements::ForUpd
63129
TableUpdateContext context(&base, false);
64130
context.AddRequirement(std::make_unique<table::AssertUUID>(base.table_uuid));
65131
for (const auto& update : table_updates) {
66-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
132+
update->GenerateRequirements(context);
67133
}
68134
return context.Build();
69135
}

src/iceberg/table_requirements.h

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
/// for optimistic concurrency control when committing table changes.
2828

2929
#include <memory>
30+
#include <string>
31+
#include <unordered_set>
3032
#include <vector>
3133

3234
#include "iceberg/iceberg_export.h"
@@ -68,27 +70,24 @@ class ICEBERG_EXPORT TableUpdateContext {
6870
/// \brief Build and return the list of requirements
6971
Result<std::vector<std::unique_ptr<TableRequirement>>> Build();
7072

71-
// Getters for deduplication flags
72-
bool added_last_assigned_field_id() const { return added_last_assigned_field_id_; }
73-
bool added_current_schema_id() const { return added_current_schema_id_; }
74-
bool added_last_assigned_partition_id() const {
75-
return added_last_assigned_partition_id_;
76-
}
77-
bool added_default_spec_id() const { return added_default_spec_id_; }
78-
bool added_default_sort_order_id() const { return added_default_sort_order_id_; }
79-
80-
// Setters for deduplication flags
81-
void set_added_last_assigned_field_id(bool value) {
82-
added_last_assigned_field_id_ = value;
83-
}
84-
void set_added_current_schema_id(bool value) { added_current_schema_id_ = value; }
85-
void set_added_last_assigned_partition_id(bool value) {
86-
added_last_assigned_partition_id_ = value;
87-
}
88-
void set_added_default_spec_id(bool value) { added_default_spec_id_ = value; }
89-
void set_added_default_sort_order_id(bool value) {
90-
added_default_sort_order_id_ = value;
91-
}
73+
// Helper methods to deduplicate requirements to add.
74+
/// \brief Require that the last assigned field ID remains unchanged
75+
void RequireLastAssignedFieldIdUnchanged();
76+
/// \brief Require that the current schema ID remains unchanged
77+
void RequireCurrentSchemaIdUnchanged();
78+
/// \brief Require that the last assigned partition ID remains unchanged
79+
void RequireLastAssignedPartitionIdUnchanged();
80+
/// \brief Require that the default spec ID remains unchanged
81+
void RequireDefaultSpecIdUnchanged();
82+
/// \brief Require that the default sort order ID remains unchanged
83+
void RequireDefaultSortOrderIdUnchanged();
84+
/// \brief Require that no branches have been changed
85+
void RequireNoBranchesChanged();
86+
87+
/// \brief Track a changed ref and return whether it was newly added
88+
/// \param ref_name The name of the ref being changed
89+
/// \return true if this is the first time the ref is being changed
90+
bool AddChangedRef(const std::string& ref_name);
9291

9392
private:
9493
const TableMetadata* base_;
@@ -102,6 +101,9 @@ class ICEBERG_EXPORT TableUpdateContext {
102101
bool added_last_assigned_partition_id_ = false;
103102
bool added_default_spec_id_ = false;
104103
bool added_default_sort_order_id_ = false;
104+
105+
// Track refs that have been changed to avoid duplicate requirements
106+
std::unordered_set<std::string> changed_refs_;
105107
};
106108

107109
/// \brief Factory class for generating table requirements

src/iceberg/table_update.cc

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include "iceberg/exception.h"
2323
#include "iceberg/table_metadata.h"
24-
#include "iceberg/table_requirement.h"
2524
#include "iceberg/table_requirements.h"
2625

2726
namespace iceberg::table {
@@ -32,9 +31,8 @@ void AssignUUID::ApplyTo(TableMetadataBuilder& builder) const {
3231
builder.AssignUUID(uuid_);
3332
}
3433

35-
Status AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
34+
void AssignUUID::GenerateRequirements(TableUpdateContext& context) const {
3635
// AssignUUID does not generate additional requirements.
37-
return {};
3836
}
3937

4038
// UpgradeFormatVersion
@@ -43,8 +41,8 @@ void UpgradeFormatVersion::ApplyTo(TableMetadataBuilder& builder) const {
4341
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
4442
}
4543

46-
Status UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
47-
return NotImplemented("UpgradeFormatVersion::GenerateRequirements not implemented");
44+
void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const {
45+
// UpgradeFormatVersion doesn't generate any requirements
4846
}
4947

5048
// AddSchema
@@ -53,8 +51,8 @@ void AddSchema::ApplyTo(TableMetadataBuilder& builder) const {
5351
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
5452
}
5553

56-
Status AddSchema::GenerateRequirements(TableUpdateContext& context) const {
57-
return NotImplemented("AddTableSchema::GenerateRequirements not implemented");
54+
void AddSchema::GenerateRequirements(TableUpdateContext& context) const {
55+
context.RequireLastAssignedFieldIdUnchanged();
5856
}
5957

6058
// SetCurrentSchema
@@ -63,8 +61,8 @@ void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const {
6361
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
6462
}
6563

66-
Status SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
67-
return NotImplemented("SetCurrentTableSchema::GenerateRequirements not implemented");
64+
void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
65+
context.RequireCurrentSchemaIdUnchanged();
6866
}
6967

7068
// AddPartitionSpec
@@ -73,8 +71,8 @@ void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
7371
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
7472
}
7573

76-
Status AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
77-
return NotImplemented("AddTablePartitionSpec::GenerateRequirements not implemented");
74+
void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
75+
context.RequireLastAssignedPartitionIdUnchanged();
7876
}
7977

8078
// SetDefaultPartitionSpec
@@ -83,9 +81,8 @@ void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
8381
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
8482
}
8583

86-
Status SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
87-
return NotImplemented(
88-
"SetDefaultTablePartitionSpec::GenerateRequirements not implemented");
84+
void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
85+
context.RequireDefaultSpecIdUnchanged();
8986
}
9087

9188
// RemovePartitionSpecs
@@ -94,9 +91,9 @@ void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
9491
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
9592
}
9693

97-
Status RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
98-
return NotImplemented(
99-
"RemoveTablePartitionSpecs::GenerateRequirements not implemented");
94+
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
95+
context.RequireDefaultSpecIdUnchanged();
96+
context.RequireNoBranchesChanged();
10097
}
10198

10299
// RemoveSchemas
@@ -105,28 +102,29 @@ void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const {
105102
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
106103
}
107104

108-
Status RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
109-
return NotImplemented("RemoveTableSchemas::GenerateRequirements not implemented");
105+
void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
106+
context.RequireCurrentSchemaIdUnchanged();
107+
context.RequireNoBranchesChanged();
110108
}
111109

112110
// AddSortOrder
113111

114112
void AddSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
115-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
113+
builder.AddSortOrder(sort_order_);
116114
}
117115

118-
Status AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
119-
return NotImplemented("AddTableSortOrder::GenerateRequirements not implemented");
116+
void AddSortOrder::GenerateRequirements(TableUpdateContext& context) const {
117+
// AddSortOrder doesn't generate any requirements
120118
}
121119

122120
// SetDefaultSortOrder
123121

124122
void SetDefaultSortOrder::ApplyTo(TableMetadataBuilder& builder) const {
125-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
123+
builder.SetDefaultSortOrder(sort_order_id_);
126124
}
127125

128-
Status SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
129-
return NotImplemented("SetDefaultTableSortOrder::GenerateRequirements not implemented");
126+
void SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) const {
127+
context.RequireDefaultSortOrderIdUnchanged();
130128
}
131129

132130
// AddSnapshot
@@ -135,16 +133,16 @@ void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
135133
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
136134
}
137135

138-
Status AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
139-
return NotImplemented("AddTableSnapshot::GenerateRequirements not implemented");
136+
void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
137+
// AddSnapshot doesn't generate any requirements
140138
}
141139

142140
// RemoveSnapshots
143141

144142
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
145143

146-
Status RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
147-
return NotImplemented("RemoveTableSnapshots::GenerateRequirements not implemented");
144+
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
145+
// RemoveSnapshots doesn't generate any requirements
148146
}
149147

150148
// RemoveSnapshotRef
@@ -153,8 +151,8 @@ void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
153151
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
154152
}
155153

156-
Status RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
157-
return NotImplemented("RemoveTableSnapshotRef::GenerateRequirements not implemented");
154+
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
155+
// RemoveSnapshotRef doesn't generate any requirements
158156
}
159157

160158
// SetSnapshotRef
@@ -163,8 +161,17 @@ void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
163161
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
164162
}
165163

166-
Status SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
167-
return NotImplemented("SetTableSnapshotRef::GenerateRequirements not implemented");
164+
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
165+
bool added = context.AddChangedRef(ref_name_);
166+
if (added && context.base() != nullptr && !context.is_replace()) {
167+
const auto& refs = context.base()->refs;
168+
auto it = refs.find(ref_name_);
169+
// Require that the ref does not exist (nullopt) or is the same as the base snapshot
170+
std::optional<int64_t> base_snapshot_id =
171+
(it != refs.end()) ? std::make_optional(it->second->snapshot_id) : std::nullopt;
172+
context.AddRequirement(
173+
std::make_unique<table::AssertRefSnapshotID>(ref_name_, base_snapshot_id));
174+
}
168175
}
169176

170177
// SetProperties
@@ -173,9 +180,8 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
173180
builder.SetProperties(updated_);
174181
}
175182

176-
Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
177-
// No requirements
178-
return {};
183+
void SetProperties::GenerateRequirements(TableUpdateContext& context) const {
184+
// SetProperties doesn't generate any requirements
179185
}
180186

181187
// RemoveProperties
@@ -184,9 +190,8 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
184190
builder.RemoveProperties(removed_);
185191
}
186192

187-
Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
188-
// No requirements
189-
return {};
193+
void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
194+
// RemoveProperties doesn't generate any requirements
190195
}
191196

192197
// SetLocation
@@ -195,8 +200,8 @@ void SetLocation::ApplyTo(TableMetadataBuilder& builder) const {
195200
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
196201
}
197202

198-
Status SetLocation::GenerateRequirements(TableUpdateContext& context) const {
199-
return NotImplemented("SetTableLocation::GenerateRequirements not implemented");
203+
void SetLocation::GenerateRequirements(TableUpdateContext& context) const {
204+
// SetLocation doesn't generate any requirements
200205
}
201206

202207
} // namespace iceberg::table

0 commit comments

Comments
 (0)