Skip to content

Commit 21940c2

Browse files
author
shuxu.li
committed
feat: add table create/replace/update interface to catalog
1 parent 6a8aeeb commit 21940c2

File tree

7 files changed

+91
-6
lines changed

7 files changed

+91
-6
lines changed

src/iceberg/catalog.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,21 @@ class ICEBERG_EXPORT Catalog {
140140
const std::string& location,
141141
const std::unordered_map<std::string, std::string>& properties) = 0;
142142

143+
/// \brief Start a transaction to replace a table
144+
///
145+
/// \param identifier a table identifier
146+
/// \param schema a schema
147+
/// \param spec a partition spec
148+
/// \param location a location for the table; leave empty if unspecified
149+
/// \param properties a string map of table properties
150+
/// \param orCreate whether to create the table if not exists
151+
/// \return a Transaction to replace the table or ErrorKind::kNotFound if the table
152+
/// doesn't exist and orCreate is false
153+
virtual Result<std::shared_ptr<Transaction>> StageReplaceTable(
154+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
155+
const std::string& location,
156+
const std::unordered_map<std::string, std::string>& properties, bool orCreate) = 0;
157+
143158
/// \brief Check whether table exists
144159
///
145160
/// \param identifier a table identifier
@@ -161,6 +176,15 @@ class ICEBERG_EXPORT Catalog {
161176
/// - On failure, contains error information.
162177
virtual Status DropTable(const TableIdentifier& identifier, bool purge) = 0;
163178

179+
/// \brief Rename a table
180+
///
181+
/// \param from the current table identifier
182+
/// \param to the new table identifier
183+
/// \return Status indicating the outcome of the operation.
184+
/// - On success, the table was renamed.
185+
/// - On failure, contains error information.
186+
virtual Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) = 0;
187+
164188
/// \brief Load a table
165189
///
166190
/// \param identifier a table identifier
@@ -223,6 +247,16 @@ class ICEBERG_EXPORT Catalog {
223247
///
224248
/// \return the Transaction to create the table
225249
virtual std::unique_ptr<Transaction> StageCreate() = 0;
250+
251+
/// \brief Starts a transaction to replace the table
252+
///
253+
/// \return the Transaction to replace the table
254+
virtual std::unique_ptr<Transaction> StageReplace() = 0;
255+
256+
/// \brief Starts a transaction to create or replace the table
257+
///
258+
/// \breturn the Transaction to create or replace the table
259+
virtual std::unique_ptr<Transaction> StageCreateOrReplace() = 0;
226260
};
227261

228262
/// \brief Instantiate a builder to either create a table or start a create/replace

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,13 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
404404
return NotImplemented("stage create table");
405405
}
406406

407+
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageReplaceTable(
408+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
409+
const std::string& location,
410+
const std::unordered_map<std::string, std::string>& properties, bool orCreate) {
411+
return NotImplemented("stage replace table");
412+
}
413+
407414
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
408415
std::unique_lock lock(mutex_);
409416
return root_namespace_->TableExists(identifier);
@@ -415,6 +422,12 @@ Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge)
415422
return root_namespace_->UnregisterTable(identifier);
416423
}
417424

425+
Status InMemoryCatalog::RenameTable(const TableIdentifier& from,
426+
const TableIdentifier& to) {
427+
std::unique_lock lock(mutex_);
428+
return NotImplemented("rename table");
429+
}
430+
418431
Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
419432
const TableIdentifier& identifier) {
420433
if (!file_io_) [[unlikely]] {

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,18 @@ class ICEBERG_EXPORT InMemoryCatalog
8585
const std::string& location,
8686
const std::unordered_map<std::string, std::string>& properties) override;
8787

88+
Result<std::shared_ptr<Transaction>> StageReplaceTable(
89+
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
90+
const std::string& location,
91+
const std::unordered_map<std::string, std::string>& properties,
92+
bool orCreate) override;
93+
8894
Result<bool> TableExists(const TableIdentifier& identifier) const override;
8995

9096
Status DropTable(const TableIdentifier& identifier, bool purge) override;
9197

98+
Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;
99+
92100
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
93101

94102
Result<std::shared_ptr<Table>> RegisterTable(

src/iceberg/table.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ const std::vector<SnapshotLogEntry>& Table::history() const {
133133
return metadata_->snapshot_log;
134134
}
135135

136+
std::shared_ptr<Transaction> Table::NewTransaction() {
137+
throw NotImplemented("Table::NewTransaction is not implemented");
138+
}
139+
136140
const std::shared_ptr<FileIO>& Table::io() const { return io_; }
137141

138142
std::unique_ptr<TableScanBuilder> Table::NewScan() const {

src/iceberg/table.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ class ICEBERG_EXPORT Table {
109109
/// filter data.
110110
virtual std::unique_ptr<TableScanBuilder> NewScan() const;
111111

112+
/// \brief Create a new transaction for this table
113+
///
114+
/// \return a shared pointer to the new Transaction
115+
virtual std::shared_ptr<Transaction> NewTransaction();
116+
112117
/// \brief Returns a FileIO to read and write table data and metadata files
113118
const std::shared_ptr<FileIO>& io() const;
114119

src/iceberg/test/mock_catalog.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,19 @@ class MockCatalog : public Catalog {
7171
const std::string&, (const std::unordered_map<std::string, std::string>&)),
7272
(override));
7373

74+
MOCK_METHOD((Result<std::shared_ptr<Transaction>>), StageReplaceTable,
75+
(const TableIdentifier&, const Schema&, const PartitionSpec&,
76+
const std::string&, (const std::unordered_map<std::string, std::string>&),
77+
bool),
78+
(override));
79+
7480
MOCK_METHOD(Result<bool>, TableExists, (const TableIdentifier&), (const, override));
7581

7682
MOCK_METHOD(Status, DropTable, (const TableIdentifier&, bool), (override));
7783

84+
MOCK_METHOD(Status, RenameTable, (const TableIdentifier&, const TableIdentifier&),
85+
(override));
86+
7887
MOCK_METHOD((Result<std::unique_ptr<Table>>), LoadTable, (const TableIdentifier&),
7988
(override));
8089

src/iceberg/transaction.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#pragma once
2222

2323
#include <memory>
24+
#include <vector>
2425

2526
#include "iceberg/iceberg_export.h"
2627
#include "iceberg/type_fwd.h"
@@ -37,17 +38,28 @@ class ICEBERG_EXPORT Transaction {
3738
/// \return this transaction's table
3839
virtual const std::shared_ptr<Table>& table() const = 0;
3940

40-
/// \brief Create a new append API to add files to this table
41+
/// \brief Create a new append API to add data files to this table
4142
///
42-
/// \return a new AppendFiles
43+
/// \return a new AppendFiles instance
4344
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
4445

46+
/// \brief Apply multiple metadata updates to this transaction
47+
///
48+
/// \param requirements the table requirements to validate
49+
/// \param updates the table updates to apply
50+
/// \return Status::OK if all updates were queued successfully
51+
virtual Status UpdateTable(
52+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
53+
std::vector<std::unique_ptr<TableUpdate>> updates) = 0;
54+
4555
/// \brief Apply the pending changes from all actions and commit
4656
///
47-
/// May throw ValidationException if any update cannot be applied to the current table
48-
/// metadata. May throw CommitFailedException if the updates cannot be committed due to
49-
/// conflicts.
50-
virtual void CommitTransaction() = 0;
57+
/// This method applies all pending data operations and metadata updates in the
58+
/// transaction and commits them to the table in a single atomic operation.
59+
///
60+
/// \return Status::OK if the transaction was committed successfully, or an error
61+
/// status if validation failed or the commit encountered conflicts
62+
virtual Status CommitTransaction() = 0;
5163
};
5264

5365
} // namespace iceberg

0 commit comments

Comments
 (0)