Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f264b7e
feat: static table metadata access support
May 27, 2025
0c15b85
feat: static table metadata access support
May 27, 2025
8e88bd5
feat: static table metadata access support
May 27, 2025
e5b4c89
feat: static table metadata access support
May 27, 2025
c214a96
feat: static table metadata access support
May 27, 2025
afa4eb3
feat: static table metadata access support
May 27, 2025
de8d622
static table metadata access support
May 28, 2025
d51cc4f
feat: static table metadata access support
May 29, 2025
ec51e13
feat: static table metadata access support
May 29, 2025
59ae84b
feat: static table metadata access support
Jun 3, 2025
2393c0e
feat: static table metadata access support
Jun 3, 2025
ce1ea34
feat: static table metadata access support
Jun 3, 2025
1066ddb
feat: static table metadata access support
Jun 3, 2025
22a9c20
feat: metadata access support for table
Jun 18, 2025
1a2f7e3
feat: metadata access support for table
Jun 18, 2025
09a828b
feat: metadata access support for table
Jun 19, 2025
79433d1
feat: metadata access support for table
Jun 20, 2025
7fe3eb9
feat: metadata access support for table
Jun 23, 2025
6dcf1e7
feat: metadata access support for table
Jun 23, 2025
1d1a975
feat: metadata access support for table
Jun 23, 2025
66ab196
feat: metadata access support for table
Jun 24, 2025
3135365
feat: metadata access support for table
Jun 24, 2025
f7faef0
fix: fix build warning in switch case (#118) (#119)
MisterRaindrop Jun 19, 2025
f360965
fix: allow PartitionField's field_id to be missing in Iceberg v1 (#121)
Smith-Cruise Jun 19, 2025
32dbcd6
refactor: remove explicit operator!= using C++20 rewrite candidates (…
mapleFU Jun 19, 2025
aaa44da
feat: implement Primitive type Literal (#117)
mapleFU Jun 25, 2025
1b869fb
feat: add `or` expression (#120)
yingcai-cy Jun 25, 2025
57db327
chore: enable compile warning as error (#125)
zhjwpku Jun 27, 2025
f873a38
refactor: replace std::any placeholder with Literal (#130)
wgtmac Jun 27, 2025
04d56d0
ci: enable sanitizer (#129)
wgtmac Jun 27, 2025
c5fe11f
feat: add support for avro to arrow data conversion (#124)
wgtmac Jun 27, 2025
82962e2
fix: no member named 'SourceFieldIndex' (#131)
zhjwpku Jun 29, 2025
4d1c3a0
feat: RegisterTable api support for InMemoryCatalog
Jul 5, 2025
ca31af1
feat: RegisterTable api support for InMemoryCatalog
Jul 7, 2025
8c41bd1
Merge remote-tracking branch 'upstream/main' into feature/my-feature
Jul 7, 2025
d9d7d2c
Merge branch 'feature/my-feature' of github.com:lishuxu/iceberg-cpp i…
Jul 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 42 additions & 182 deletions src/iceberg/catalog/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@

#include <algorithm>
#include <iterator> // IWYU pragma: keep
#include <mutex>
#include <optional>
#include <unordered_map>

#include "iceberg/exception.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

/// \brief A hierarchical namespace that manages namespaces and table metadata in-memory.
///
/// Each InMemoryNamespace represents a namespace level and can contain properties,
Expand Down Expand Up @@ -318,117 +314,56 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
return it->second;
}

} // namespace

class ICEBERG_EXPORT InMemoryCatalogImpl {
public:
InMemoryCatalogImpl(std::string name, std::shared_ptr<FileIO> file_io,
std::string warehouse_location,
std::unordered_map<std::string, std::string> properties);

std::string_view name() const;

Status CreateNamespace(const Namespace& ns,
const std::unordered_map<std::string, std::string>& properties);

Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const;

Status DropNamespace(const Namespace& ns);

Result<bool> NamespaceExists(const Namespace& ns) const;

Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
const Namespace& ns) const;

Status UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals);

Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const;

Result<std::unique_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties);

Result<std::unique_ptr<Table>> UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates);

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties);

Result<bool> TableExists(const TableIdentifier& identifier) const;

Status DropTable(const TableIdentifier& identifier, bool purge);

Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) const;

Result<std::shared_ptr<Table>> RegisterTable(const TableIdentifier& identifier,
const std::string& metadata_file_location);

std::unique_ptr<TableBuilder> BuildTable(const TableIdentifier& identifier,
const Schema& schema) const;

private:
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
std::shared_ptr<FileIO> file_io_;
std::string warehouse_location_;
std::unique_ptr<class InMemoryNamespace> root_namespace_;
mutable std::recursive_mutex mutex_;
};

InMemoryCatalogImpl::InMemoryCatalogImpl(
std::string name, std::shared_ptr<FileIO> file_io, std::string warehouse_location,
std::unordered_map<std::string, std::string> properties)
InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties)
: catalog_name_(std::move(name)),
properties_(std::move(properties)),
file_io_(std::move(file_io)),
warehouse_location_(std::move(warehouse_location)),
root_namespace_(std::make_unique<InMemoryNamespace>()) {}

std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; }
InMemoryCatalog::~InMemoryCatalog() = default;

std::string_view InMemoryCatalog::name() const { return catalog_name_; }

Status InMemoryCatalogImpl::CreateNamespace(
Status InMemoryCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return root_namespace_->CreateNamespace(ns, properties);
}

Result<std::vector<Namespace>> InMemoryCatalogImpl::ListNamespaces(
Result<std::unordered_map<std::string, std::string>>
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->GetProperties(ns);
}

Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->ListNamespaces(ns);
}

Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) {
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
std::unique_lock lock(mutex_);
return root_namespace_->DropNamespace(ns);
}

Result<bool> InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const {
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->NamespaceExists(ns);
}

Result<std::unordered_map<std::string, std::string>>
InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const {
std::unique_lock lock(mutex_);
return root_namespace_->GetProperties(ns);
}

Status InMemoryCatalogImpl::UpdateNamespaceProperties(
Status InMemoryCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
std::unique_lock lock(mutex_);
return root_namespace_->UpdateNamespaceProperties(ns, updates, removals);
}

Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
const Namespace& ns) const {
std::unique_lock lock(mutex_);
const auto& table_names = root_namespace_->ListTables(ns);
Expand All @@ -441,44 +376,58 @@ Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
return table_idents;
}

Result<std::unique_ptr<Table>> InMemoryCatalogImpl::CreateTable(
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("create table");
}

Result<std::unique_ptr<Table>> InMemoryCatalogImpl::UpdateTable(
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
return NotImplemented("update table");
}

Result<std::shared_ptr<Transaction>> InMemoryCatalogImpl::StageCreateTable(
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("stage create table");
}

Result<bool> InMemoryCatalogImpl::TableExists(const TableIdentifier& identifier) const {
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
std::unique_lock lock(mutex_);
return root_namespace_->TableExists(identifier);
}

Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool purge) {
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
std::unique_lock lock(mutex_);
// TODO(Guotao): Delete all metadata files if purge is true.
return root_namespace_->UnregisterTable(identifier);
}

Result<std::shared_ptr<Table>> InMemoryCatalogImpl::LoadTable(
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) const {
return NotImplemented("load table");
if (!file_io_) [[unlikely]] {
return NotSupported("file_io is not set for catalog {}", catalog_name_);
}

std::unique_lock lock(mutex_);
auto metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
ICEBERG_RETURN_UNEXPECTED(metadata_location);

auto metadata = TableMetadataUtil::Read(*file_io_, metadata_location.value());
ICEBERG_RETURN_UNEXPECTED(metadata);

return std::make_shared<Table>(
identifier, std::move(metadata.value()), metadata_location.value(), file_io_,
std::static_pointer_cast<Catalog>(
std::const_pointer_cast<InMemoryCatalog>(shared_from_this())));
}

Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
Expand All @@ -490,95 +439,6 @@ Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
return LoadTable(identifier);
}

std::unique_ptr<TableBuilder> InMemoryCatalogImpl::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
throw IcebergError("not implemented");
}

InMemoryCatalog::InMemoryCatalog(
std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
std::unordered_map<std::string, std::string> const& properties)
: impl_(std::make_unique<InMemoryCatalogImpl>(name, file_io, warehouse_location,
properties)) {}

InMemoryCatalog::~InMemoryCatalog() = default;

std::string_view InMemoryCatalog::name() const { return impl_->name(); }

Status InMemoryCatalog::CreateNamespace(
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
return impl_->CreateNamespace(ns, properties);
}

Result<std::unordered_map<std::string, std::string>>
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
return impl_->GetNamespaceProperties(ns);
}

Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
const Namespace& ns) const {
return impl_->ListNamespaces(ns);
}

Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
return impl_->DropNamespace(ns);
}

Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
return impl_->NamespaceExists(ns);
}

Status InMemoryCatalog::UpdateNamespaceProperties(
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
const std::unordered_set<std::string>& removals) {
return impl_->UpdateNamespaceProperties(ns, updates, removals);
}

Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
const Namespace& ns) const {
return impl_->ListTables(ns);
}

Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return impl_->CreateTable(identifier, schema, spec, location, properties);
}

Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
return impl_->UpdateTable(identifier, requirements, updates);
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
return impl_->StageCreateTable(identifier, schema, spec, location, properties);
}

Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
return impl_->TableExists(identifier);
}

Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
return impl_->DropTable(identifier, purge);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
const TableIdentifier& identifier) const {
return impl_->LoadTable(identifier);
}

Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) {
return impl_->RegisterTable(identifier, metadata_file_location);
}

std::unique_ptr<TableBuilder> InMemoryCatalog::BuildTable(
const TableIdentifier& identifier, const Schema& schema) const {
throw IcebergError("not implemented");
Expand Down
14 changes: 12 additions & 2 deletions src/iceberg/catalog/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

#pragma once

#include <mutex>

#include "iceberg/catalog.h"

namespace iceberg {

/**
* @brief An in-memory implementation of the Iceberg Catalog interface.
*
Expand All @@ -32,7 +35,9 @@ namespace iceberg {
* @note This class is **not** suitable for production use.
* All data will be lost when the process exits.
*/
class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
class ICEBERG_EXPORT InMemoryCatalog
: public Catalog,
public std::enable_shared_from_this<InMemoryCatalog> {
public:
InMemoryCatalog(std::string const& name, std::shared_ptr<FileIO> const& file_io,
std::string const& warehouse_location,
Expand Down Expand Up @@ -90,7 +95,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
const Schema& schema) const override;

private:
std::unique_ptr<class InMemoryCatalogImpl> impl_;
std::string catalog_name_;
std::unordered_map<std::string, std::string> properties_;
std::shared_ptr<FileIO> file_io_;
std::string warehouse_location_;
std::unique_ptr<class InMemoryNamespace> root_namespace_;
mutable std::recursive_mutex mutex_;
};

} // namespace iceberg
12 changes: 7 additions & 5 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ target_sources(schema_test
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME schema_test COMMAND schema_test)

add_executable(catalog_test)
target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME catalog_test COMMAND catalog_test)

add_executable(table_test)
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
Expand Down Expand Up @@ -90,4 +85,11 @@ if(ICEBERG_BUILD_BUNDLE)
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME arrow_test COMMAND arrow_test)

add_executable(catalog_test)
target_include_directories(catalog_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(catalog_test PRIVATE test_common.cc in_memory_catalog_test.cc)
target_link_libraries(catalog_test PRIVATE iceberg_bundle_static GTest::gtest_main
GTest::gmock)
add_test(NAME catalog_test COMMAND catalog_test)
endif()
Loading
Loading