Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \return instance of Table implementation referred to by identifier or
/// ErrorKind::kNoSuchTable if the table does not exist
virtual Result<std::shared_ptr<Table>> LoadTable(
const TableIdentifier& identifier) const = 0;
virtual Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) = 0;

/// \brief Register a table with the catalog if it does not exist
///
Expand Down
227 changes: 45 additions & 182 deletions src/iceberg/catalog/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@

#include <algorithm>
#include <iterator> // IWYU pragma: keep
#include <mutex>
#include <unordered_map>

#include "iceberg/exception.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

/// \brief A hierarchical namespace that manages namespaces and table metadata in-memory.
///
/// Each InMemoryNamespace represents a namespace level and can contain properties,
Expand Down Expand Up @@ -317,117 +314,56 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
return it->second;
}

} // namespace

class ICEBERG_EXPORT InMemoryCatalogImpl {
public:
InMemoryCatalogImpl(std::string name, std::shared_ptr<FileIO> file_io,
std::string warehouse_location,
std::unordered_map<std::string, std::string> properties);

std::string_view name() const;

Status CreateNamespace(const Namespace& ns,
const std::unordered_map<std::string, std::string>& properties);

Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const;

Status DropNamespace(const Namespace& ns);

Result<bool> NamespaceExists(const Namespace& ns) const;

Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
const Namespace& ns) const;

Status UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals);

Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const;

Result<std::unique_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties);

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates);

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties);

Result<bool> TableExists(const TableIdentifier& identifier) const;

Status DropTable(const TableIdentifier& identifier, bool purge);

Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) const;

Result<std::shared_ptr<Table>> RegisterTable(const TableIdentifier& identifier,
const std::string& metadata_file_location);

std::unique_ptr<TableBuilder> BuildTable(const TableIdentifier& identifier,
const Schema& schema) const;

private:
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
std::shared_ptr<FileIO> file_io_;
std::string warehouse_location_;
std::unique_ptr<class InMemoryNamespace> root_namespace_;
mutable std::recursive_mutex mutex_;
};

InMemoryCatalogImpl::InMemoryCatalogImpl(
std::string name, std::shared_ptr<FileIO> file_io, std::string warehouse_location,
std::unordered_map<std::string, std::string> properties)
InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties)
: catalog_name_(std::move(name)),
properties_(std::move(properties)),
file_io_(std::move(file_io)),
warehouse_location_(std::move(warehouse_location)),
root_namespace_(std::make_unique<InMemoryNamespace>()) {}

std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; }
InMemoryCatalog::~InMemoryCatalog() = default;

std::string_view InMemoryCatalog::name() const { return catalog_name_; }

Status InMemoryCatalogImpl::CreateNamespace(
Status InMemoryCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return root_namespace_->CreateNamespace(ns, properties);
}

Result<std::vector<Namespace>> InMemoryCatalogImpl::ListNamespaces(
Result<std::unordered_map<std::string, std::string>>
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->GetProperties(ns);
}

Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->ListNamespaces(ns);
}

Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) {
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
std::unique_lock lock(mutex_);
return root_namespace_->DropNamespace(ns);
}

Result<bool> InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const {
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->NamespaceExists(ns);
}

Result<std::unordered_map<std::string, std::string>>
InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->GetProperties(ns);
}

Status InMemoryCatalogImpl::UpdateNamespaceProperties(
Status InMemoryCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
std::unique_lock lock(mutex_);
return root_namespace_->UpdateNamespaceProperties(ns, updates, removals);
}

Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
const auto& table_names = root_namespace_->ListTables(ns);
Expand All @@ -440,44 +376,60 @@ Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
return table_idents;
}

Result<std::unique_ptr<Table>> InMemoryCatalogImpl::CreateTable(
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("create table");
}

Result<std::unique_ptr<Table>> InMemoryCatalogImpl::UpdateTable(
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
return NotImplemented("update table");
}

Result<std::shared_ptr<Transaction>> InMemoryCatalogImpl::StageCreateTable(
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("stage create table");
}

Result<bool> InMemoryCatalogImpl::TableExists(const TableIdentifier& identifier) const {
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
std::unique_lock lock(mutex_);
return root_namespace_->TableExists(identifier);
}

Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool purge) {
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
std::unique_lock lock(mutex_);
// TODO(Guotao): Delete all metadata files if purge is true.
return root_namespace_->UnregisterTable(identifier);
}

Result<std::shared_ptr<Table>> InMemoryCatalogImpl::LoadTable(
const TableIdentifier& identifier) const {
return NotImplemented("load table");
Result<std::unique_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) {
if (!file_io_) [[unlikely]] {
return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
}

Result<std::string> metadata_location;
{
std::unique_lock lock(mutex_);
ICEBERG_ASSIGN_OR_RAISE(metadata_location,
root_namespace_->GetTableMetadataLocation(identifier));
}

ICEBERG_ASSIGN_OR_RAISE(auto metadata,
TableMetadataUtil::Read(*file_io_, metadata_location.value()));

return std::make_unique<Table>(identifier, std::move(metadata),
metadata_location.value(), file_io_,
std::static_pointer_cast<Catalog>(shared_from_this()));
}

Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
Expand All @@ -489,95 +441,6 @@ Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
return LoadTable(identifier);
}

std::unique_ptr<TableBuilder> InMemoryCatalogImpl::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
throw IcebergError("not implemented");
}

InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties)
: impl_(std::make_unique<InMemoryCatalogImpl>(name, file_io, warehouse_location,
properties)) {}

InMemoryCatalog::~InMemoryCatalog() = default;

std::string_view InMemoryCatalog::name() const { return impl_->name(); }

Status InMemoryCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
return impl_->CreateNamespace(ns, properties);
}

Result<std::unordered_map<std::string, std::string>>
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
return impl_->GetNamespaceProperties(ns);
}

Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
const Namespace& ns) const {
return impl_->ListNamespaces(ns);
}

Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
return impl_->DropNamespace(ns);
}

Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
return impl_->NamespaceExists(ns);
}

Status InMemoryCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
return impl_->UpdateNamespaceProperties(ns, updates, removals);
}

Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
const Namespace& ns) const {
return impl_->ListTables(ns);
}

Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return impl_->CreateTable(identifier, schema, spec, location, properties);
}

Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
return impl_->UpdateTable(identifier, requirements, updates);
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return impl_->StageCreateTable(identifier, schema, spec, location, properties);
}

Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
return impl_->TableExists(identifier);
}

Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
return impl_->DropTable(identifier, purge);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) const {
return impl_->LoadTable(identifier);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
return impl_->RegisterTable(identifier, metadata_file_location);
}

std::unique_ptr<TableBuilder> InMemoryCatalog::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
throw IcebergError("not implemented");
Expand Down
17 changes: 13 additions & 4 deletions src/iceberg/catalog/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#pragma once

#include <mutex>

#include "iceberg/catalog.h"

namespace iceberg {

/**
* @brief An in-memory implementation of the Iceberg Catalog interface.
*
Expand All @@ -32,7 +35,9 @@ namespace iceberg {
* @note This class is **not** suitable for production use.
* All data will be lost when the process exits.
*/
class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
class ICEBERG_EXPORT InMemoryCatalog
: public Catalog,
public std::enable_shared_from_this<InMemoryCatalog> {
public:
InMemoryCatalog(std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
Expand Down Expand Up @@ -79,8 +84,7 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {

Status DropTable(const TableIdentifier& identifier, bool purge) override;

Result<std::shared_ptr<Table>> LoadTable(
const TableIdentifier& identifier) const override;
Result<std::unique_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;

Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier,
Expand All @@ -90,7 +94,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
const Schema& schema) const override;

private:
std::unique_ptr<class InMemoryCatalogImpl> impl_;
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
std::shared_ptr<FileIO> file_io_;
std::string warehouse_location_;
std::unique_ptr<class InMemoryNamespace> root_namespace_;
mutable std::recursive_mutex mutex_;
};

} // namespace iceberg
Loading
Loading