Skip to content

Commit e1acefd

Browse files
authored
feat: add sort order to table metadata builder (apache#345)
1 parent 2f0955e commit e1acefd

File tree

5 files changed

+348
-18
lines changed

5 files changed

+348
-18
lines changed

src/iceberg/table_metadata.cc

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121

2222
#include <algorithm>
2323
#include <chrono>
24+
#include <cstdint>
2425
#include <format>
26+
#include <optional>
2527
#include <ranges>
2628
#include <string>
29+
#include <unordered_map>
2730

2831
#include <nlohmann/json.hpp>
2932

@@ -39,12 +42,11 @@
3942
#include "iceberg/util/gzip_internal.h"
4043
#include "iceberg/util/macros.h"
4144
#include "iceberg/util/uuid.h"
42-
4345
namespace iceberg {
44-
4546
namespace {
4647
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
47-
}
48+
constexpr int32_t kLastAdded = -1;
49+
} // namespace
4850

4951
std::string ToString(const SnapshotLogEntry& entry) {
5052
return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]",
@@ -274,11 +276,19 @@ struct TableMetadataBuilder::Impl {
274276

275277
// Change tracking
276278
std::vector<std::unique_ptr<TableUpdate>> changes;
279+
std::optional<int32_t> last_added_schema_id;
280+
std::optional<int32_t> last_added_order_id;
281+
std::optional<int32_t> last_added_spec_id;
277282

278283
// Metadata location tracking
279284
std::optional<std::string> metadata_location;
280285
std::optional<std::string> previous_metadata_location;
281286

287+
// indexes for convenience
288+
std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id;
289+
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id;
290+
std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id;
291+
282292
// Constructor for new table
283293
explicit Impl(int8_t format_version) : base(nullptr), metadata{} {
284294
metadata.format_version = format_version;
@@ -294,7 +304,22 @@ struct TableMetadataBuilder::Impl {
294304

295305
// Constructor from existing metadata
296306
explicit Impl(const TableMetadata* base_metadata)
297-
: base(base_metadata), metadata(*base_metadata) {}
307+
: base(base_metadata), metadata(*base_metadata) {
308+
// Initialize index maps from base metadata
309+
for (const auto& schema : metadata.schemas) {
310+
if (schema->schema_id().has_value()) {
311+
schemas_by_id.emplace(schema->schema_id().value(), schema);
312+
}
313+
}
314+
315+
for (const auto& spec : metadata.partition_specs) {
316+
specs_by_id.emplace(spec->spec_id(), spec);
317+
}
318+
319+
for (const auto& order : metadata.sort_orders) {
320+
sort_orders_by_id.emplace(order->order_id(), order);
321+
}
322+
}
298323
};
299324

300325
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
@@ -434,16 +459,95 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
434459

435460
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(
436461
std::shared_ptr<SortOrder> order) {
437-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
462+
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
463+
return SetDefaultSortOrder(order_id);
438464
}
439465

440466
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(int32_t order_id) {
441-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
467+
if (order_id == -1) {
468+
if (!impl_->last_added_order_id.has_value()) {
469+
return AddError(ErrorKind::kInvalidArgument,
470+
"Cannot set last added sort order: no sort order has been added");
471+
}
472+
return SetDefaultSortOrder(impl_->last_added_order_id.value());
473+
}
474+
475+
if (order_id == impl_->metadata.default_sort_order_id) {
476+
return *this;
477+
}
478+
479+
impl_->metadata.default_sort_order_id = order_id;
480+
481+
if (impl_->last_added_order_id == std::make_optional(order_id)) {
482+
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(kLastAdded));
483+
} else {
484+
impl_->changes.push_back(std::make_unique<table::SetDefaultSortOrder>(order_id));
485+
}
486+
return *this;
487+
}
488+
489+
Result<int32_t> TableMetadataBuilder::AddSortOrderInternal(const SortOrder& order) {
490+
int32_t new_order_id = ReuseOrCreateNewSortOrderId(order);
491+
492+
if (impl_->sort_orders_by_id.find(new_order_id) != impl_->sort_orders_by_id.end()) {
493+
// update last_added_order_id if the order was added in this set of changes (since it
494+
// is now the last)
495+
bool is_new_order =
496+
impl_->last_added_order_id.has_value() &&
497+
std::ranges::find_if(impl_->changes, [new_order_id](const auto& change) {
498+
auto* add_sort_order = dynamic_cast<table::AddSortOrder*>(change.get());
499+
return add_sort_order &&
500+
add_sort_order->sort_order()->order_id() == new_order_id;
501+
}) != impl_->changes.cend();
502+
impl_->last_added_order_id =
503+
is_new_order ? std::make_optional(new_order_id) : std::nullopt;
504+
return new_order_id;
505+
}
506+
507+
// Get current schema and validate the sort order against it
508+
ICEBERG_ASSIGN_OR_RAISE(auto schema, impl_->metadata.Schema());
509+
ICEBERG_RETURN_UNEXPECTED(order.Validate(*schema));
510+
511+
std::shared_ptr<SortOrder> new_order;
512+
if (order.is_unsorted()) {
513+
new_order = SortOrder::Unsorted();
514+
} else {
515+
// Unlike freshSortOrder from Java impl, we don't use field name from old bound
516+
// schema to rebuild the sort order.
517+
ICEBERG_ASSIGN_OR_RAISE(
518+
new_order,
519+
SortOrder::Make(new_order_id, std::vector<SortField>(order.fields().begin(),
520+
order.fields().end())));
521+
}
522+
523+
impl_->metadata.sort_orders.push_back(new_order);
524+
impl_->sort_orders_by_id.emplace(new_order_id, new_order);
525+
526+
impl_->changes.push_back(std::make_unique<table::AddSortOrder>(new_order));
527+
impl_->last_added_order_id = new_order_id;
528+
return new_order_id;
442529
}
443530

444531
TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
445532
std::shared_ptr<SortOrder> order) {
446-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
533+
BUILDER_ASSIGN_OR_RETURN(auto order_id, AddSortOrderInternal(*order));
534+
return *this;
535+
}
536+
537+
int32_t TableMetadataBuilder::ReuseOrCreateNewSortOrderId(const SortOrder& new_order) {
538+
if (new_order.is_unsorted()) {
539+
return SortOrder::kUnsortedOrderId;
540+
}
541+
// determine the next order id
542+
int32_t new_order_id = SortOrder::kInitialSortOrderId;
543+
for (const auto& order : impl_->metadata.sort_orders) {
544+
if (order->SameOrder(new_order)) {
545+
return order->order_id();
546+
} else if (new_order_id <= order->order_id()) {
547+
new_order_id = order->order_id() + 1;
548+
}
549+
}
550+
return new_order_id;
447551
}
448552

449553
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(

src/iceberg/table_metadata.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,17 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector {
436436
/// \brief Private constructor for building from existing metadata
437437
explicit TableMetadataBuilder(const TableMetadata* base);
438438

439+
/// \brief Internal method to add a sort order and return its ID
440+
/// \param order The sort order to add
441+
/// \return The ID of the added or reused sort order
442+
Result<int32_t> AddSortOrderInternal(const SortOrder& order);
443+
444+
/// \brief Internal method to check for existing sort order and reuse its ID or create a
445+
/// new one
446+
/// \param new_order The sort order to check
447+
/// \return The ID to use for this sort order (reused if exists, new otherwise)
448+
int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);
449+
439450
/// Internal state members
440451
struct Impl;
441452
std::unique_ptr<Impl> impl_;

0 commit comments

Comments
 (0)