Skip to content

Commit 7475e89

Browse files
author
shuxu.li
committed
feat: transactional UpdateProperties support
1 parent 96df225 commit 7475e89

14 files changed

+565
-85
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
1919
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
22+
base_transaction.cc
2223
catalog/memory/in_memory_catalog.cc
2324
expression/aggregate.cc
2425
expression/binder.cc
@@ -49,6 +50,7 @@ set(ICEBERG_SOURCES
4950
partition_field.cc
5051
partition_spec.cc
5152
partition_summary.cc
53+
pending_update.cc
5254
row/arrow_array_wrapper.cc
5355
row/manifest_wrapper.cc
5456
row/partition_values.cc

src/iceberg/base_transaction.cc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/base_transaction.h"
21+
22+
#include <utility>
23+
24+
#include "iceberg/catalog.h"
25+
#include "iceberg/pending_update.h"
26+
#include "iceberg/table.h"
27+
#include "iceberg/table_metadata.h"
28+
#include "iceberg/table_requirements.h"
29+
#include "iceberg/table_update.h"
30+
31+
namespace iceberg {
32+
33+
BaseTransaction::BaseTransaction(std::shared_ptr<Table> table,
34+
std::shared_ptr<Catalog> catalog)
35+
: table_(std::move(table)), catalog_(std::move(catalog)) {
36+
ICEBERG_DCHECK(table_ != nullptr, "table must not be null");
37+
ICEBERG_DCHECK(catalog_ != nullptr, "catalog must not be null");
38+
}
39+
40+
const std::shared_ptr<Table>& BaseTransaction::table() const { return table_; }
41+
42+
std::shared_ptr<PropertiesUpdate> BaseTransaction::UpdateProperties() {
43+
return RegisterUpdate<PropertiesUpdate>();
44+
}
45+
46+
std::shared_ptr<AppendFiles> BaseTransaction::NewAppend() {
47+
throw NotImplemented("BaseTransaction::NewAppend not implemented");
48+
}
49+
50+
Status BaseTransaction::CommitTransaction() {
51+
const auto& metadata = table_->metadata();
52+
if (!metadata) {
53+
return InvalidArgument("Table metadata is null");
54+
}
55+
56+
auto builder = TableMetadataBuilder::BuildFrom(metadata.get());
57+
for (const auto& pending_update : pending_updates_) {
58+
if (!pending_update) {
59+
continue;
60+
}
61+
ICEBERG_RETURN_UNEXPECTED(pending_update->Apply(*builder));
62+
}
63+
64+
auto table_updates = builder->GetChanges();
65+
TableUpdateContext context(metadata.get(), /*is_replace=*/false);
66+
for (const auto& update : table_updates) {
67+
ICEBERG_RETURN_UNEXPECTED(update->GenerateRequirements(context));
68+
}
69+
ICEBERG_ASSIGN_OR_RAISE(auto table_requirements, context.Build());
70+
71+
ICEBERG_ASSIGN_OR_RAISE(
72+
auto updated_table,
73+
catalog_->UpdateTable(table_->name(), table_requirements, table_updates));
74+
75+
if (updated_table) {
76+
table_ = std::shared_ptr<Table>(std::move(updated_table));
77+
}
78+
79+
pending_updates_.clear();
80+
return {};
81+
}
82+
83+
} // namespace iceberg

src/iceberg/base_transaction.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <vector>
23+
24+
#include "iceberg/transaction.h"
25+
#include "iceberg/type_fwd.h"
26+
27+
namespace iceberg {
28+
29+
/// \brief Base class for transaction implementations
30+
class BaseTransaction : public Transaction {
31+
public:
32+
BaseTransaction(std::shared_ptr<Table> table, std::shared_ptr<Catalog> catalog);
33+
~BaseTransaction() override = default;
34+
35+
const std::shared_ptr<Table>& table() const override;
36+
37+
std::shared_ptr<PropertiesUpdate> UpdateProperties() override;
38+
39+
std::shared_ptr<AppendFiles> NewAppend() override;
40+
41+
Status CommitTransaction() override;
42+
43+
protected:
44+
template <typename UpdateType, typename... Args>
45+
std::shared_ptr<UpdateType> RegisterUpdate(Args&&... args) {
46+
auto update = std::make_shared<UpdateType>(std::forward<Args>(args)...);
47+
pending_updates_.push_back(update);
48+
return update;
49+
}
50+
51+
std::shared_ptr<Table> table_;
52+
std::shared_ptr<Catalog> catalog_;
53+
std::vector<std::shared_ptr<PendingUpdate>> pending_updates_;
54+
};
55+
56+
} // namespace iceberg

src/iceberg/pending_update.cc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/pending_update.h"
21+
22+
#include "iceberg/catalog.h"
23+
#include "iceberg/table.h"
24+
#include "iceberg/table_metadata.h"
25+
#include "iceberg/table_update.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg {
29+
30+
// ============================================================================
31+
// UpdateProperties implementation
32+
// ============================================================================
33+
34+
PropertiesUpdate& PropertiesUpdate::Set(std::string const& key,
35+
std::string const& value) {
36+
updates_[key] = value;
37+
return *this;
38+
}
39+
40+
PropertiesUpdate& PropertiesUpdate::Remove(std::string const& key) {
41+
removals_.push_back(key);
42+
return *this;
43+
}
44+
45+
Result<PropertiesUpdateChanges> PropertiesUpdate::Apply() {
46+
return PropertiesUpdateChanges{updates_, removals_};
47+
}
48+
49+
Status PropertiesUpdate::ApplyResult(TableMetadataBuilder& builder,
50+
PropertiesUpdateChanges result) {
51+
if (!result.updates.empty()) {
52+
builder.SetProperties(result.updates);
53+
}
54+
if (!result.removals.empty()) {
55+
builder.RemoveProperties(result.removals);
56+
}
57+
return {};
58+
}
59+
60+
Status PropertiesUpdate::Commit() {
61+
return NotImplemented("UpdateProperties::Commit() not implemented");
62+
}
63+
64+
} // namespace iceberg

src/iceberg/pending_update.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
/// \file iceberg/pending_update.h
2323
/// API for table changes using builder pattern
2424

25+
#include <string>
26+
#include <unordered_map>
27+
#include <vector>
28+
2529
#include "iceberg/iceberg_export.h"
2630
#include "iceberg/result.h"
2731
#include "iceberg/type_fwd.h"
2832
#include "iceberg/util/error_collector.h"
33+
#include "iceberg/util/macros.h"
2934

3035
namespace iceberg {
3136

@@ -67,6 +72,17 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
6772

6873
protected:
6974
PendingUpdate() = default;
75+
76+
/// \brief Apply the pending changes to a TableMetadataBuilder
77+
///
78+
/// This method applies the changes by calling builder's specific methods.
79+
/// The builder will automatically record corresponding TableUpdate objects.
80+
///
81+
/// \param builder The TableMetadataBuilder to apply changes to
82+
/// \return Status::OK if the changes were applied successfully, or an error
83+
virtual Status Apply(TableMetadataBuilder& builder) = 0;
84+
85+
friend class BaseTransaction;
7086
};
7187

7288
} // namespace iceberg

src/iceberg/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ class ICEBERG_EXPORT Table {
124124
/// \brief Returns a FileIO to read and write table data and metadata files
125125
const std::shared_ptr<FileIO>& io() const;
126126

127+
/// \brief Return the underlying table metadata
128+
const std::shared_ptr<TableMetadata>& metadata() const { return metadata_; }
129+
127130
private:
128131
const TableIdentifier identifier_;
129132
std::shared_ptr<TableMetadata> metadata_;

src/iceberg/table_metadata.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,4 +566,17 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Build() {
566566
return result;
567567
}
568568

569+
std::vector<std::unique_ptr<TableUpdate>> TableMetadataBuilder::GetChanges() {
570+
return std::move(impl_->changes);
571+
}
572+
573+
std::vector<const TableUpdate*> TableMetadataBuilder::GetChangesView() const {
574+
std::vector<const TableUpdate*> result;
575+
result.reserve(impl_->changes.size());
576+
for (const auto& change : impl_->changes) {
577+
result.push_back(change.get());
578+
}
579+
return result;
580+
}
581+
569582
} // namespace iceberg

src/iceberg/table_metadata.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,23 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
421421
/// \brief Destructor
422422
~TableMetadataBuilder();
423423

424+
/// \brief Get all recorded TableUpdate changes
425+
///
426+
/// This method extracts all TableUpdate objects that were automatically
427+
/// recorded when modification methods were called. The changes are moved
428+
/// out of the builder, so this method should only be called once.
429+
///
430+
/// \return A vector of TableUpdate objects representing all changes
431+
std::vector<std::unique_ptr<TableUpdate>> GetChanges();
432+
433+
/// \brief Get all recorded TableUpdate changes (const version for inspection)
434+
///
435+
/// This method returns a view of all TableUpdate objects without moving them.
436+
/// Useful for inspection before committing.
437+
///
438+
/// \return A vector of const pointers to TableUpdate objects
439+
std::vector<const TableUpdate*> GetChangesView() const;
440+
424441
// Delete copy operations (use BuildFrom to create a new builder)
425442
TableMetadataBuilder(const TableMetadataBuilder&) = delete;
426443
TableMetadataBuilder& operator=(const TableMetadataBuilder&) = delete;

src/iceberg/table_update.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ void SetProperties::ApplyTo(TableMetadataBuilder& builder) const {
180180
builder.SetProperties(updated_);
181181
}
182182

183-
void SetProperties::GenerateRequirements(TableUpdateContext& context) const {
184-
// SetProperties doesn't generate any requirements
183+
Status SetProperties::GenerateRequirements(TableUpdateContext& context) const {
184+
// No requirements
185+
return {};
185186
}
186187

187188
// RemoveProperties
@@ -190,8 +191,9 @@ void RemoveProperties::ApplyTo(TableMetadataBuilder& builder) const {
190191
builder.RemoveProperties(removed_);
191192
}
192193

193-
void RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
194-
// RemoveProperties doesn't generate any requirements
194+
Status RemoveProperties::GenerateRequirements(TableUpdateContext& context) const {
195+
// No requirements
196+
return {};
195197
}
196198

197199
// SetLocation

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ add_iceberg_test(table_test
7777
table_requirement_test.cc
7878
table_requirements_test.cc
7979
table_update_test.cc
80+
transaction_pending_update_test.cc
8081
update_properties_test.cc)
8182

8283
add_iceberg_test(expression_test

0 commit comments

Comments
 (0)