Skip to content

Commit 3d36a9e

Browse files
authored
feat(rest): implement stage-create table (#485)
1 parent a785119 commit 3d36a9e

File tree

4 files changed

+85
-17
lines changed

4 files changed

+85
-17
lines changed

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "iceberg/table.h"
4545
#include "iceberg/table_requirement.h"
4646
#include "iceberg/table_update.h"
47+
#include "iceberg/transaction.h"
4748
#include "iceberg/util/macros.h"
4849

4950
namespace iceberg::rest {
@@ -274,11 +275,11 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns
274275
return result;
275276
}
276277

277-
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
278+
Result<LoadTableResult> RestCatalog::CreateTableInternal(
278279
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
279280
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
280281
const std::string& location,
281-
const std::unordered_map<std::string, std::string>& properties) {
282+
const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
282283
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
283284
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
284285

@@ -288,7 +289,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
288289
.schema = schema,
289290
.partition_spec = spec,
290291
.write_order = order,
291-
.stage_create = false,
292+
.stage_create = stage_create,
292293
.properties = properties,
293294
};
294295

@@ -298,10 +299,19 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
298299
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
299300

300301
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
301-
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
302-
return Table::Make(identifier, load_result.metadata,
303-
std::move(load_result.metadata_location), file_io_,
304-
shared_from_this());
302+
return LoadTableResultFromJson(json);
303+
}
304+
305+
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
306+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
307+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
308+
const std::string& location,
309+
const std::unordered_map<std::string, std::string>& properties) {
310+
ICEBERG_ASSIGN_OR_RAISE(auto result,
311+
CreateTableInternal(identifier, schema, spec, order, location,
312+
properties, /*stage_create=*/false));
313+
return Table::Make(identifier, std::move(result.metadata),
314+
std::move(result.metadata_location), file_io_, shared_from_this());
305315
}
306316

307317
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -335,13 +345,19 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
335345
}
336346

337347
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
338-
[[maybe_unused]] const TableIdentifier& identifier,
339-
[[maybe_unused]] const std::shared_ptr<Schema>& schema,
340-
[[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
341-
[[maybe_unused]] const std::shared_ptr<SortOrder>& order,
342-
[[maybe_unused]] const std::string& location,
343-
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
344-
return NotImplemented("Not implemented");
348+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
349+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
350+
const std::string& location,
351+
const std::unordered_map<std::string, std::string>& properties) {
352+
ICEBERG_ASSIGN_OR_RAISE(auto result,
353+
CreateTableInternal(identifier, schema, spec, order, location,
354+
properties, /*stage_create=*/true));
355+
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
356+
StagedTable::Make(identifier, std::move(result.metadata),
357+
std::move(result.metadata_location), file_io_,
358+
shared_from_this()));
359+
return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
360+
/*auto_commit=*/false);
345361
}
346362

347363
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
@@ -393,9 +409,6 @@ Result<std::string> RestCatalog::LoadTableInternal(
393409
}
394410

395411
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
396-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
397-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
398-
399412
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
400413
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
401414
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
110110

111111
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
112112

113+
Result<LoadTableResult> CreateTableInternal(
114+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
115+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
116+
const std::string& location,
117+
const std::unordered_map<std::string, std::string>& properties, bool stage_create);
118+
113119
std::unique_ptr<RestCatalogProperties> config_;
114120
std::shared_ptr<FileIO> file_io_;
115121
std::unique_ptr<HttpClient> client_;

src/iceberg/catalog/rest/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
namespace iceberg::rest {
2626

2727
struct ErrorResponse;
28+
struct LoadTableResult;
2829

2930
class Endpoint;
3031
class ErrorHandler;

src/iceberg/test/rest_catalog_test.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "iceberg/test/std_io.h"
5353
#include "iceberg/test/test_resource.h"
5454
#include "iceberg/test/util/docker_compose_util.h"
55+
#include "iceberg/transaction.h"
5556

5657
namespace iceberg::rest {
5758

@@ -639,4 +640,51 @@ TEST_F(RestCatalogIntegrationTest, RegisterTable) {
639640
EXPECT_NE(table->name(), registered_table->name());
640641
}
641642

643+
TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
644+
auto catalog_result = CreateCatalog();
645+
ASSERT_THAT(catalog_result, IsOk());
646+
auto& catalog = catalog_result.value();
647+
648+
// Create namespace
649+
Namespace ns{.levels = {"test_stage_create"}};
650+
auto status = catalog->CreateNamespace(ns, {});
651+
EXPECT_THAT(status, IsOk());
652+
653+
// Stage create table
654+
auto schema = CreateDefaultSchema();
655+
auto partition_spec = PartitionSpec::Unpartitioned();
656+
auto sort_order = SortOrder::Unsorted();
657+
658+
TableIdentifier table_id{.ns = ns, .name = "staged_table"};
659+
std::unordered_map<std::string, std::string> table_properties{{"key1", "value1"}};
660+
auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec,
661+
sort_order, "", table_properties);
662+
ASSERT_THAT(txn_result, IsOk());
663+
auto& txn = txn_result.value();
664+
665+
// Verify the staged table in transaction
666+
EXPECT_NE(txn->table(), nullptr);
667+
EXPECT_EQ(txn->table()->name(), table_id);
668+
669+
// Table should NOT exist in catalog yet (staged but not committed)
670+
auto exists_result = catalog->TableExists(table_id);
671+
ASSERT_THAT(exists_result, IsOk());
672+
EXPECT_FALSE(exists_result.value());
673+
674+
// Commit the transaction
675+
auto commit_result = txn->Commit();
676+
ASSERT_THAT(commit_result, IsOk());
677+
auto& committed_table = commit_result.value();
678+
679+
// Verify table now exists
680+
exists_result = catalog->TableExists(table_id);
681+
ASSERT_THAT(exists_result, IsOk());
682+
EXPECT_TRUE(exists_result.value());
683+
684+
// Verify table properties
685+
EXPECT_EQ(committed_table->name(), table_id);
686+
auto& props = committed_table->metadata()->properties.configs();
687+
EXPECT_EQ(props.at("key1"), "value1");
688+
}
689+
642690
} // namespace iceberg::rest

0 commit comments

Comments
 (0)