Skip to content

Commit 14bd4f8

Browse files
author
shuxu.li
committed
feat: table transactional UpdateProperties support
1 parent 4159c75 commit 14bd4f8

21 files changed

+416
-313
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ set(ICEBERG_SOURCES
5050
partition_field.cc
5151
partition_spec.cc
5252
partition_summary.cc
53-
pending_update.cc
5453
row/arrow_array_wrapper.cc
5554
row/manifest_wrapper.cc
5655
row/partition_values.cc
@@ -66,6 +65,7 @@ set(ICEBERG_SOURCES
6665
table.cc
6766
table_metadata.cc
6867
table_properties.cc
68+
transaction_catalog.cc
6969
table_requirement.cc
7070
table_requirements.cc
7171
table_scan.cc

src/iceberg/base_transaction.cc

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,126 @@
1919

2020
#include "iceberg/base_transaction.h"
2121

22+
#include <utility>
23+
2224
#include "iceberg/catalog.h"
2325
#include "iceberg/pending_update.h"
2426
#include "iceberg/table.h"
25-
#include "iceberg/table_metadata.h"
26-
#include "iceberg/table_requirements.h"
27-
#include "iceberg/table_update.h"
27+
#include "iceberg/update/update_properties.h"
28+
#include "iceberg/util/macros.h"
2829

2930
namespace iceberg {
3031

3132
BaseTransaction::BaseTransaction(std::shared_ptr<const Table> table,
3233
std::shared_ptr<Catalog> catalog)
33-
: table_(std::move(table)), catalog_(std::move(catalog)) {
34+
: table_(std::move(table)) {
3435
ICEBERG_DCHECK(table_ != nullptr, "table must not be null");
35-
ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null");
36+
ICEBERG_DCHECK(catalog != nullptr, "catalog must not be null");
37+
context_.identifier = table_->name();
38+
context_.current_metadata = table_->metadata();
39+
catalog_ = std::make_shared<TransactionCatalog>(std::move(catalog), this);
3640
}
3741

3842
const std::shared_ptr<const Table>& BaseTransaction::table() const { return table_; }
3943

40-
std::shared_ptr<PropertiesUpdate> BaseTransaction::UpdateProperties() {
41-
return RegisterUpdate<PropertiesUpdate>();
44+
std::unique_ptr<UpdateProperties> BaseTransaction::UpdateProperties() {
45+
auto update = CheckAndCreateUpdate<::iceberg::UpdateProperties>(
46+
table_->name(), catalog_, CurrentMetadata());
47+
if (!update.has_value()) {
48+
ERROR_TO_EXCEPTION(update.error());
49+
}
50+
51+
return std::move(update).value();
4252
}
4353

44-
std::shared_ptr<AppendFiles> BaseTransaction::NewAppend() {
54+
std::unique_ptr<AppendFiles> BaseTransaction::NewAppend() {
4555
throw NotImplemented("BaseTransaction::NewAppend not implemented");
4656
}
4757

4858
Status BaseTransaction::CommitTransaction() {
49-
const auto& metadata = table_->metadata();
50-
if (!metadata) {
51-
return InvalidArgument("Table metadata is null");
59+
if (!HasLastOperationCommitted()) {
60+
return InvalidState("Cannot commit transaction: last operation has not committed");
5261
}
5362

54-
auto builder = TableMetadataBuilder::BuildFrom(metadata.get());
55-
for (const auto& pending_update : pending_updates_) {
56-
if (!pending_update) {
57-
continue;
58-
}
59-
ICEBERG_RETURN_UNEXPECTED(pending_update->Apply(*builder));
63+
auto pending_updates = ConsumePendingUpdates();
64+
if (pending_updates.empty()) {
65+
return {};
6066
}
6167

62-
auto table_updates = builder->GetChanges();
63-
TableUpdateContext context(metadata.get(), /*is_replace=*/false);
64-
for (const auto& update : table_updates) {
65-
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
66-
}
67-
ICEBERG_ASSIGN_OR_RAISE(auto table_requirements, context.Build());
68+
auto pending_requirements = ConsumePendingRequirements();
6869

6970
ICEBERG_ASSIGN_OR_RAISE(
7071
auto updated_table,
71-
catalog_->UpdateTable(table_->name(), table_requirements, table_updates));
72+
catalog_->catalog_impl()->UpdateTable(
73+
table_->name(), std::move(pending_requirements), std::move(pending_updates)));
7274

75+
// update table to the new version
7376
if (updated_table) {
7477
table_ = std::shared_ptr<Table>(std::move(updated_table));
7578
}
7679

77-
pending_updates_.clear();
7880
return {};
7981
}
8082

83+
Result<std::unique_ptr<Table>> BaseTransaction::StageUpdates(
84+
const TableIdentifier& identifier,
85+
std::vector<std::unique_ptr<TableRequirement>> requirements,
86+
std::vector<std::unique_ptr<TableUpdate>> updates) {
87+
if (identifier != context_.identifier) {
88+
return InvalidArgument("Transaction only supports table '{}'",
89+
context_.identifier.name);
90+
}
91+
92+
if (!context_.current_metadata) {
93+
return InvalidState("Transaction metadata is not initialized");
94+
}
95+
96+
if (updates.empty()) {
97+
return std::make_unique<Table>(
98+
context_.identifier, std::make_shared<TableMetadata>(*context_.current_metadata),
99+
table_->location(), table_->io(), catalog_->catalog_impl());
100+
}
101+
102+
ICEBERG_RETURN_UNEXPECTED(ApplyUpdates(updates));
103+
104+
for (auto& requirement : requirements) {
105+
context_.pending_requirements.emplace_back(std::move(requirement));
106+
}
107+
for (auto& update : updates) {
108+
context_.pending_updates.emplace_back(std::move(update));
109+
}
110+
111+
return std::make_unique<Table>(
112+
context_.identifier, std::make_shared<TableMetadata>(*context_.current_metadata),
113+
table_->location(), table_->io(), catalog_->catalog_impl());
114+
}
115+
116+
Status BaseTransaction::ApplyUpdates(
117+
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
118+
if (updates.empty()) {
119+
return {};
120+
}
121+
122+
auto builder = TableMetadataBuilder::BuildFrom(context_.current_metadata.get());
123+
for (const auto& update : updates) {
124+
if (!update) {
125+
continue;
126+
}
127+
update->ApplyTo(*builder);
128+
}
129+
130+
ICEBERG_ASSIGN_OR_RAISE(auto new_metadata, builder->Build());
131+
context_.current_metadata = std::shared_ptr<TableMetadata>(std::move(new_metadata));
132+
return {};
133+
}
134+
135+
std::vector<std::unique_ptr<TableRequirement>>
136+
BaseTransaction::ConsumePendingRequirements() {
137+
return std::exchange(context_.pending_requirements, {});
138+
}
139+
140+
std::vector<std::unique_ptr<TableUpdate>> BaseTransaction::ConsumePendingUpdates() {
141+
return std::exchange(context_.pending_updates, {});
142+
}
143+
81144
} // namespace iceberg

src/iceberg/base_transaction.h

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,82 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <vector>
2324

25+
#include "iceberg/table_identifier.h"
26+
#include "iceberg/table_metadata.h"
27+
#include "iceberg/table_requirement.h"
28+
#include "iceberg/table_update.h"
2429
#include "iceberg/transaction.h"
30+
#include "iceberg/transaction_catalog.h"
2531
#include "iceberg/type_fwd.h"
2632

2733
namespace iceberg {
2834

2935
/// \brief Base class for transaction implementations
30-
class BaseTransaction : public Transaction {
36+
class ICEBERG_EXPORT BaseTransaction : public Transaction {
3137
public:
3238
BaseTransaction(std::shared_ptr<const Table> table, std::shared_ptr<Catalog> catalog);
3339
~BaseTransaction() override = default;
3440

3541
const std::shared_ptr<const Table>& table() const override;
3642

37-
std::shared_ptr<PropertiesUpdate> UpdateProperties() override;
43+
std::unique_ptr<::iceberg::UpdateProperties> UpdateProperties() override;
3844

39-
std::shared_ptr<AppendFiles> NewAppend() override;
45+
std::unique_ptr<AppendFiles> NewAppend() override;
4046

4147
Status CommitTransaction() override;
4248

49+
Result<std::unique_ptr<Table>> StageUpdates(
50+
const TableIdentifier& identifier,
51+
std::vector<std::unique_ptr<TableRequirement>> requirements,
52+
std::vector<std::unique_ptr<TableUpdate>> updates);
53+
54+
bool HasLastOperationCommitted() const { return context_.last_operation_committed; }
55+
56+
void SetLastOperationCommitted(bool committed) {
57+
context_.last_operation_committed = committed;
58+
}
59+
60+
const std::shared_ptr<TableMetadata>& CurrentMetadata() const {
61+
return context_.current_metadata;
62+
}
63+
64+
Status ApplyUpdates(const std::vector<std::unique_ptr<TableUpdate>>& updates);
65+
66+
std::vector<std::unique_ptr<TableRequirement>> ConsumePendingRequirements();
67+
68+
std::vector<std::unique_ptr<TableUpdate>> ConsumePendingUpdates();
69+
4370
protected:
4471
template <typename UpdateType, typename... Args>
45-
std::shared_ptr<UpdateType> RegisterUpdate(Args&&... args) {
46-
auto update = std::make_shared<UpdateType>(std::forward<Args>(args)...);
47-
pending_updates_.push_back(update);
48-
return update;
72+
Result<std::unique_ptr<UpdateType>> CheckAndCreateUpdate(Args&&... args) {
73+
if (!HasLastOperationCommitted()) {
74+
return InvalidState(
75+
"Cannot create new update: last operation in transaction has not committed");
76+
}
77+
SetLastOperationCommitted(false);
78+
return std::make_unique<UpdateType>(std::forward<Args>(args)...);
4979
}
5080

81+
private:
82+
struct TransactionContext {
83+
TransactionContext() = default;
84+
TransactionContext(const TableIdentifier& identifier,
85+
std::shared_ptr<TableMetadata> metadata)
86+
: identifier(identifier), current_metadata(std::move(metadata)) {}
87+
88+
bool last_operation_committed = true;
89+
TableIdentifier identifier;
90+
std::shared_ptr<TableMetadata> current_metadata;
91+
std::vector<std::unique_ptr<TableRequirement>> pending_requirements;
92+
std::vector<std::unique_ptr<TableUpdate>> pending_updates;
93+
};
94+
5195
std::shared_ptr<const Table> table_;
52-
std::shared_ptr<Catalog> catalog_;
53-
std::vector<std::shared_ptr<PendingUpdate>> pending_updates_;
96+
std::shared_ptr<TransactionCatalog> catalog_;
97+
TransactionContext context_;
5498
};
5599

56100
} // namespace iceberg

src/iceberg/catalog.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ class ICEBERG_EXPORT Catalog {
123123
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
124124
virtual Result<std::unique_ptr<Table>> UpdateTable(
125125
const TableIdentifier& identifier,
126-
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
127-
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;
126+
std::vector<std::unique_ptr<TableRequirement>> requirements,
127+
std::vector<std::unique_ptr<TableUpdate>> updates) = 0;
128128

129129
/// \brief Start a transaction to create a table
130130
///
@@ -184,6 +184,8 @@ class ICEBERG_EXPORT Catalog {
184184
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
185185
virtual Result<std::shared_ptr<Table>> RegisterTable(
186186
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;
187+
188+
virtual void SetLastOperationCommitted(bool committed) = 0;
187189
};
188190

189191
} // namespace iceberg

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,8 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
392392

393393
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
394394
const TableIdentifier& identifier,
395-
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
396-
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
395+
std::vector<std::unique_ptr<TableRequirement>> requirements,
396+
std::vector<std::unique_ptr<TableUpdate>> updates) {
397397
return NotImplemented("update table");
398398
}
399399

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <mutex>
2323

2424
#include "iceberg/catalog.h"
25+
#include "iceberg/table_requirement.h"
26+
#include "iceberg/table_update.h"
2527

2628
namespace iceberg {
2729

@@ -77,8 +79,8 @@ class ICEBERG_EXPORT InMemoryCatalog
7779

7880
Result<std::unique_ptr<Table>> UpdateTable(
7981
const TableIdentifier& identifier,
80-
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
81-
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
82+
std::vector<std::unique_ptr<TableRequirement>> requirements,
83+
std::vector<std::unique_ptr<TableUpdate>> updates) override;
8284

8385
Result<std::shared_ptr<Transaction>> StageCreateTable(
8486
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
@@ -97,6 +99,8 @@ class ICEBERG_EXPORT InMemoryCatalog
9799
const TableIdentifier& identifier,
98100
const std::string& metadata_file_location) override;
99101

102+
void SetLastOperationCommitted(bool committed) override {}
103+
100104
private:
101105
std::string catalog_name_;
102106
std::unordered_map<std::string, std::string> properties_;

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
155155

156156
Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
157157
[[maybe_unused]] const TableIdentifier& identifier,
158-
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
159-
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
158+
[[maybe_unused]] std::vector<std::unique_ptr<TableRequirement>> requirements,
159+
[[maybe_unused]] std::vector<std::unique_ptr<TableUpdate>> updates) {
160160
return NotImplemented("Not implemented");
161161
}
162162

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7676

7777
Result<std::unique_ptr<Table>> UpdateTable(
7878
const TableIdentifier& identifier,
79-
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
80-
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
79+
std::vector<std::unique_ptr<TableRequirement>> requirements,
80+
std::vector<std::unique_ptr<TableUpdate>> updates) override;
8181

8282
Result<std::shared_ptr<Transaction>> StageCreateTable(
8383
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
@@ -96,6 +96,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
9696
const TableIdentifier& identifier,
9797
const std::string& metadata_file_location) override;
9898

99+
void SetLastOperationCommitted(bool committed) override {}
100+
99101
private:
100102
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
101103
std::unique_ptr<ResourcePaths> paths);

0 commit comments

Comments
 (0)