Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ class ICEBERG_EXPORT Catalog {
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) = 0;

/// \brief Start a transaction to replace a table
///
/// \param identifier a table identifier
/// \param schema a schema
/// \param spec a partition spec
/// \param location a location for the table; leave empty if unspecified
/// \param properties a string map of table properties
/// \param orCreate whether to create the table if not exists
/// \return a Transaction to replace the table or ErrorKind::kNotFound if the table
/// doesn't exist and orCreate is false
virtual Result<std::shared_ptr<Transaction>> StageReplaceTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties, bool orCreate) = 0;

/// \brief Check whether table exists
///
/// \param identifier a table identifier
Expand All @@ -161,6 +176,15 @@ class ICEBERG_EXPORT Catalog {
/// - On failure, contains error information.
virtual Status DropTable(const TableIdentifier& identifier, bool purge) = 0;

/// \brief Rename a table
///
/// \param from the current table identifier
/// \param to the new table identifier
/// \return Status indicating the outcome of the operation.
/// - On success, the table was renamed.
/// - On failure, contains error information.
virtual Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) = 0;

/// \brief Load a table
///
/// \param identifier a table identifier
Expand Down Expand Up @@ -223,6 +247,16 @@ class ICEBERG_EXPORT Catalog {
///
/// \return the Transaction to create the table
virtual std::unique_ptr<Transaction> StageCreate() = 0;

/// \brief Starts a transaction to replace the table
///
/// \return the Transaction to replace the table
virtual std::unique_ptr<Transaction> StageReplace() = 0;

/// \brief Starts a transaction to create or replace the table
///
/// \return the Transaction to create or replace the table
virtual std::unique_ptr<Transaction> StageCreateOrReplace() = 0;
};

/// \brief Instantiate a builder to either create a table or start a create/replace
Expand Down
13 changes: 13 additions & 0 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,13 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
return NotImplemented("stage create table");
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageReplaceTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties, bool orCreate) {
return NotImplemented("stage replace table");
}

Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
std::unique_lock lock(mutex_);
return root_namespace_->TableExists(identifier);
Expand All @@ -415,6 +422,12 @@ Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge)
return root_namespace_->UnregisterTable(identifier);
}

Status InMemoryCatalog::RenameTable(const TableIdentifier& from,
const TableIdentifier& to) {
std::unique_lock lock(mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need to move lock, we can use std::lock_guard<> instead. Can you help to modify the other cases in this file? Or we can open a new PR to do this.

return NotImplemented("rename table");
}

Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) {
if (!file_io_) [[unlikely]] {
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,18 @@ class ICEBERG_EXPORT InMemoryCatalog
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

Result<std::shared_ptr<Transaction>> StageReplaceTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties,
bool orCreate) override;

Result<bool> TableExists(const TableIdentifier& identifier) const override;

Status DropTable(const TableIdentifier& identifier, bool purge) override;

Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;

Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;

Result<std::shared_ptr<Table>> RegisterTable(
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ const std::vector<SnapshotLogEntry>& Table::history() const {
return metadata_->snapshot_log;
}

std::shared_ptr<Transaction> Table::NewTransaction() const {
throw NotImplemented("Table::NewTransaction is not implemented");
}

const std::shared_ptr<FileIO>& Table::io() const { return io_; }

std::unique_ptr<TableScanBuilder> Table::NewScan() const {
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class ICEBERG_EXPORT Table {
/// filter data.
virtual std::unique_ptr<TableScanBuilder> NewScan() const;

/// \brief Create a new transaction for this table
///
/// \return a shared pointer to the new Transaction
virtual std::shared_ptr<Transaction> NewTransaction() const;

/// \brief Returns a FileIO to read and write table data and metadata files
const std::shared_ptr<FileIO>& io() const;

Expand Down
9 changes: 9 additions & 0 deletions src/iceberg/test/mock_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ class MockCatalog : public Catalog {
const std::string&, (const std::unordered_map<std::string, std::string>&)),
(override));

MOCK_METHOD((Result<std::shared_ptr<Transaction>>), StageReplaceTable,
(const TableIdentifier&, const Schema&, const PartitionSpec&,
const std::string&, (const std::unordered_map<std::string, std::string>&),
bool),
(override));

MOCK_METHOD(Result<bool>, TableExists, (const TableIdentifier&), (const, override));

MOCK_METHOD(Status, DropTable, (const TableIdentifier&, bool), (override));

MOCK_METHOD(Status, RenameTable, (const TableIdentifier&, const TableIdentifier&),
(override));

MOCK_METHOD((Result<std::unique_ptr<Table>>), LoadTable, (const TableIdentifier&),
(override));

Expand Down
21 changes: 17 additions & 4 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#pragma once

#include <memory>
#include <vector>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why include this?


#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -42,12 +44,23 @@ class ICEBERG_EXPORT Transaction {
/// \return a new AppendFiles
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;

/// \brief Apply multiple metadata updates to this transaction
///
/// \param requirements the table requirements to validate
/// \param updates the table updates to apply
/// \return Status::OK if all updates were queued successfully
virtual Status UpdateTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I don't think Transaction is the entrance to create a single update action and Catalog is responsible of this. Let's remove it.

const std::vector<std::unique_ptr<TableRequirement>>& requirements,
std::vector<std::unique_ptr<TableUpdate>> updates) = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::vector<std::unique_ptr<TableUpdate>> updates) = 0;
const std::vector<std::unique_ptr<TableUpdate>>& updates) = 0;


/// \brief Apply the pending changes from all actions and commit
///
/// May throw ValidationException if any update cannot be applied to the current table
/// metadata. May throw CommitFailedException if the updates cannot be committed due to
/// conflicts.
virtual void CommitTransaction() = 0;
/// This method applies all pending data operations and metadata updates in the
/// transaction and commits them to the table in a single atomic operation.
///
/// \return Status::OK if the transaction was committed successfully, or an error
/// status if validation failed or the commit encountered conflicts
virtual Status CommitTransaction() = 0;
};

} // namespace iceberg
Loading