Skip to content

Commit 4d1c3a0

Browse files
author
shuxu.li
committed
feat: RegisterTable api support for InMemoryCatalog
1 parent 82962e2 commit 4d1c3a0

File tree

5 files changed

+136
-232
lines changed

5 files changed

+136
-232
lines changed

src/iceberg/catalog/in_memory_catalog.cc

Lines changed: 42 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,14 @@
2121

2222
#include <algorithm>
2323
#include <iterator> // IWYU pragma: keep
24-
#include <mutex>
25-
#include <optional>
26-
#include <unordered_map>
2724

2825
#include "iceberg/exception.h"
2926
#include "iceberg/table.h"
27+
#include "iceberg/table_metadata.h"
3028
#include "iceberg/util/macros.h"
3129

3230
namespace iceberg {
3331

34-
namespace {
35-
3632
/// \brief A hierarchical namespace that manages namespaces and table metadata in-memory.
3733
///
3834
/// Each InMemoryNamespace represents a namespace level and can contain properties,
@@ -318,117 +314,56 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
318314
return it->second;
319315
}
320316

321-
} // namespace
322-
323-
class ICEBERG_EXPORT InMemoryCatalogImpl {
324-
public:
325-
InMemoryCatalogImpl(std::string name, std::shared_ptr<FileIO> file_io,
326-
std::string warehouse_location,
327-
std::unordered_map<std::string, std::string> properties);
328-
329-
std::string_view name() const;
330-
331-
Status CreateNamespace(const Namespace& ns,
332-
const std::unordered_map<std::string, std::string>& properties);
333-
334-
Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const;
335-
336-
Status DropNamespace(const Namespace& ns);
337-
338-
Result<bool> NamespaceExists(const Namespace& ns) const;
339-
340-
Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
341-
const Namespace& ns) const;
342-
343-
Status UpdateNamespaceProperties(
344-
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
345-
const std::unordered_set<std::string>& removals);
346-
347-
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const;
348-
349-
Result<std::unique_ptr<Table>> CreateTable(
350-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
351-
const std::string& location,
352-
const std::unordered_map<std::string, std::string>& properties);
353-
354-
Result<std::unique_ptr<Table>> UpdateTable(
355-
const TableIdentifier& identifier,
356-
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
357-
const std::vector<std::unique_ptr<MetadataUpdate>>& updates);
358-
359-
Result<std::shared_ptr<Transaction>> StageCreateTable(
360-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
361-
const std::string& location,
362-
const std::unordered_map<std::string, std::string>& properties);
363-
364-
Result<bool> TableExists(const TableIdentifier& identifier) const;
365-
366-
Status DropTable(const TableIdentifier& identifier, bool purge);
367-
368-
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) const;
369-
370-
Result<std::shared_ptr<Table>> RegisterTable(const TableIdentifier& identifier,
371-
const std::string& metadata_file_location);
372-
373-
std::unique_ptr<TableBuilder> BuildTable(const TableIdentifier& identifier,
374-
const Schema& schema) const;
375-
376-
private:
377-
std::string catalog_name_;
378-
std::unordered_map<std::string, std::string> properties_;
379-
std::shared_ptr<FileIO> file_io_;
380-
std::string warehouse_location_;
381-
std::unique_ptr<class InMemoryNamespace> root_namespace_;
382-
mutable std::recursive_mutex mutex_;
383-
};
384-
385-
InMemoryCatalogImpl::InMemoryCatalogImpl(
386-
std::string name, std::shared_ptr<FileIO> file_io, std::string warehouse_location,
387-
std::unordered_map<std::string, std::string> properties)
317+
InMemoryCatalog::InMemoryCatalog(
318+
std::string const& name, std::shared_ptr<FileIO> const& file_io,
319+
std::string const& warehouse_location,
320+
std::unordered_map<std::string, std::string> const& properties)
388321
: catalog_name_(std::move(name)),
389322
properties_(std::move(properties)),
390323
file_io_(std::move(file_io)),
391324
warehouse_location_(std::move(warehouse_location)),
392325
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
393326

394-
std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; }
327+
InMemoryCatalog::~InMemoryCatalog() = default;
328+
329+
std::string_view InMemoryCatalog::name() const { return catalog_name_; }
395330

396-
Status InMemoryCatalogImpl::CreateNamespace(
331+
Status InMemoryCatalog::CreateNamespace(
397332
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
398333
std::unique_lock lock(mutex_);
399334
return root_namespace_->CreateNamespace(ns, properties);
400335
}
401336

402-
Result<std::vector<Namespace>> InMemoryCatalogImpl::ListNamespaces(
337+
Result<std::unordered_map<std::string, std::string>>
338+
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
339+
std::unique_lock lock(mutex_);
340+
return root_namespace_->GetProperties(ns);
341+
}
342+
343+
Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
403344
const Namespace& ns) const {
404345
std::unique_lock lock(mutex_);
405346
return root_namespace_->ListNamespaces(ns);
406347
}
407348

408-
Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) {
349+
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
409350
std::unique_lock lock(mutex_);
410351
return root_namespace_->DropNamespace(ns);
411352
}
412353

413-
Result<bool> InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const {
354+
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
414355
std::unique_lock lock(mutex_);
415356
return root_namespace_->NamespaceExists(ns);
416357
}
417358

418-
Result<std::unordered_map<std::string, std::string>>
419-
InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const {
420-
std::unique_lock lock(mutex_);
421-
return root_namespace_->GetProperties(ns);
422-
}
423-
424-
Status InMemoryCatalogImpl::UpdateNamespaceProperties(
359+
Status InMemoryCatalog::UpdateNamespaceProperties(
425360
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
426361
const std::unordered_set<std::string>& removals) {
427362
std::unique_lock lock(mutex_);
428363
return root_namespace_->UpdateNamespaceProperties(ns, updates, removals);
429364
}
430365

431-
Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
366+
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
432367
const Namespace& ns) const {
433368
std::unique_lock lock(mutex_);
434369
const auto& table_names = root_namespace_->ListTables(ns);
@@ -441,44 +376,58 @@ Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
441376
return table_idents;
442377
}
443378

444-
Result<std::unique_ptr<Table>> InMemoryCatalogImpl::CreateTable(
379+
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
445380
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
446381
const std::string& location,
447382
const std::unordered_map<std::string, std::string>& properties) {
448383
return NotImplemented("create table");
449384
}
450385

451-
Result<std::unique_ptr<Table>> InMemoryCatalogImpl::UpdateTable(
386+
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
452387
const TableIdentifier& identifier,
453388
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
454389
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
455390
return NotImplemented("update table");
456391
}
457392

458-
Result<std::shared_ptr<Transaction>> InMemoryCatalogImpl::StageCreateTable(
393+
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
459394
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
460395
const std::string& location,
461396
const std::unordered_map<std::string, std::string>& properties) {
462397
return NotImplemented("stage create table");
463398
}
464399

465-
Result<bool> InMemoryCatalogImpl::TableExists(const TableIdentifier& identifier) const {
400+
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
466401
std::unique_lock lock(mutex_);
467402
return root_namespace_->TableExists(identifier);
468403
}
469404

470-
Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool purge) {
405+
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
471406
std::unique_lock lock(mutex_);
472407
// TODO(Guotao): Delete all metadata files if purge is true.
473408
return root_namespace_->UnregisterTable(identifier);
474409
}
475410

476-
Result<std::shared_ptr<Table>> InMemoryCatalogImpl::LoadTable(
411+
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
477412
const TableIdentifier& identifier) const {
478-
return NotImplemented("load table");
413+
if (!file_io_) [[unlikely]] {
414+
return NotSupported("file_io is not set for catalog {}", catalog_name_);
415+
}
416+
417+
std::unique_lock lock(mutex_);
418+
auto metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
419+
ICEBERG_RETURN_UNEXPECTED(metadata_location);
420+
421+
auto metadata = TableMetadataUtil::Read(*file_io_, metadata_location.value());
422+
ICEBERG_RETURN_UNEXPECTED(metadata);
423+
424+
return std::make_shared<Table>(
425+
identifier, std::move(metadata.value()), metadata_location.value(), file_io_,
426+
std::static_pointer_cast<Catalog>(
427+
std::const_pointer_cast<InMemoryCatalog>(shared_from_this())));
479428
}
480429

481-
Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
430+
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
482431
const TableIdentifier& identifier, const std::string& metadata_file_location) {
483432
std::unique_lock lock(mutex_);
484433
if (!root_namespace_->NamespaceExists(identifier.ns)) {
@@ -490,95 +439,6 @@ Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
490439
return LoadTable(identifier);
491440
}
492441

493-
std::unique_ptr<TableBuilder> InMemoryCatalogImpl::BuildTable(
494-
const TableIdentifier& identifier, const Schema& schema) const {
495-
throw IcebergError("not implemented");
496-
}
497-
498-
InMemoryCatalog::InMemoryCatalog(
499-
std::string const& name, std::shared_ptr<FileIO> const& file_io,
500-
std::string const& warehouse_location,
501-
std::unordered_map<std::string, std::string> const& properties)
502-
: impl_(std::make_unique<InMemoryCatalogImpl>(name, file_io, warehouse_location,
503-
properties)) {}
504-
505-
InMemoryCatalog::~InMemoryCatalog() = default;
506-
507-
std::string_view InMemoryCatalog::name() const { return impl_->name(); }
508-
509-
Status InMemoryCatalog::CreateNamespace(
510-
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
511-
return impl_->CreateNamespace(ns, properties);
512-
}
513-
514-
Result<std::unordered_map<std::string, std::string>>
515-
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
516-
return impl_->GetNamespaceProperties(ns);
517-
}
518-
519-
Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
520-
const Namespace& ns) const {
521-
return impl_->ListNamespaces(ns);
522-
}
523-
524-
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
525-
return impl_->DropNamespace(ns);
526-
}
527-
528-
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
529-
return impl_->NamespaceExists(ns);
530-
}
531-
532-
Status InMemoryCatalog::UpdateNamespaceProperties(
533-
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
534-
const std::unordered_set<std::string>& removals) {
535-
return impl_->UpdateNamespaceProperties(ns, updates, removals);
536-
}
537-
538-
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
539-
const Namespace& ns) const {
540-
return impl_->ListTables(ns);
541-
}
542-
543-
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
544-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
545-
const std::string& location,
546-
const std::unordered_map<std::string, std::string>& properties) {
547-
return impl_->CreateTable(identifier, schema, spec, location, properties);
548-
}
549-
550-
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
551-
const TableIdentifier& identifier,
552-
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
553-
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
554-
return impl_->UpdateTable(identifier, requirements, updates);
555-
}
556-
557-
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
558-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
559-
const std::string& location,
560-
const std::unordered_map<std::string, std::string>& properties) {
561-
return impl_->StageCreateTable(identifier, schema, spec, location, properties);
562-
}
563-
564-
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
565-
return impl_->TableExists(identifier);
566-
}
567-
568-
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
569-
return impl_->DropTable(identifier, purge);
570-
}
571-
572-
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
573-
const TableIdentifier& identifier) const {
574-
return impl_->LoadTable(identifier);
575-
}
576-
577-
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
578-
const TableIdentifier& identifier, const std::string& metadata_file_location) {
579-
return impl_->RegisterTable(identifier, metadata_file_location);
580-
}
581-
582442
std::unique_ptr<TableBuilder> InMemoryCatalog::BuildTable(
583443
const TableIdentifier& identifier, const Schema& schema) const {
584444
throw IcebergError("not implemented");

src/iceberg/catalog/in_memory_catalog.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
#pragma once
2121

22+
#include <mutex>
23+
2224
#include "iceberg/catalog.h"
2325

2426
namespace iceberg {
27+
2528
/**
2629
* @brief An in-memory implementation of the Iceberg Catalog interface.
2730
*
@@ -32,7 +35,9 @@ namespace iceberg {
3235
* @note This class is **not** suitable for production use.
3336
* All data will be lost when the process exits.
3437
*/
35-
class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
38+
class ICEBERG_EXPORT InMemoryCatalog
39+
: public Catalog,
40+
public std::enable_shared_from_this<InMemoryCatalog> {
3641
public:
3742
InMemoryCatalog(std::string const& name, std::shared_ptr<FileIO> const& file_io,
3843
std::string const& warehouse_location,
@@ -90,7 +95,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
9095
const Schema& schema) const override;
9196

9297
private:
93-
std::unique_ptr<class InMemoryCatalogImpl> impl_;
98+
std::string catalog_name_;
99+
std::unordered_map<std::string, std::string> properties_;
100+
std::shared_ptr<FileIO> file_io_;
101+
std::string warehouse_location_;
102+
std::unique_ptr<class InMemoryNamespace> root_namespace_;
103+
mutable std::recursive_mutex mutex_;
94104
};
95105

96106
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ target_sources(schema_test
4444
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
4545
add_test(NAME schema_test COMMAND schema_test)
4646

47-
add_executable(catalog_test)
48-
target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
49-
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
50-
add_test(NAME catalog_test COMMAND catalog_test)
51-
5247
add_executable(table_test)
5348
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
5449
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
@@ -90,4 +85,11 @@ if(ICEBERG_BUILD_BUNDLE)
9085
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
9186
GTest::gmock)
9287
add_test(NAME arrow_test COMMAND arrow_test)
88+
89+
add_executable(catalog_test)
90+
target_include_directories(catalog_test PRIVATE "${CMAKE_BINARY_DIR}")
91+
target_sources(catalog_test PRIVATE test_common.cc in_memory_catalog_test.cc)
92+
target_link_libraries(catalog_test PRIVATE iceberg_bundle_static GTest::gtest_main
93+
GTest::gmock)
94+
add_test(NAME catalog_test COMMAND catalog_test)
9395
endif()

0 commit comments

Comments
 (0)