Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(ICEBERG_SOURCES
sort_field.cc
sort_order.cc
statistics_file.cc
table.cc
table_metadata.cc
transform.cc
transform_function.cc
Expand Down
16 changes: 16 additions & 0 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,20 @@ struct ICEBERG_EXPORT Snapshot {
bool Equals(const Snapshot& other) const;
};

/// \brief Represents a snapshot log entry
struct ICEBERG_EXPORT SnapshotLogEntry {
/// The timestamp in milliseconds of the change
TimePointMs timestamp_ms;
/// ID of the snapshot
int64_t snapshot_id;

friend bool operator==(const SnapshotLogEntry& lhs, const SnapshotLogEntry& rhs) {
return lhs.timestamp_ms == rhs.timestamp_ms && lhs.snapshot_id == rhs.snapshot_id;
}

friend bool operator!=(const SnapshotLogEntry& lhs, const SnapshotLogEntry& rhs) {
return !(lhs == rhs);
}
};

} // namespace iceberg
130 changes: 130 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table.h"

#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"

namespace iceberg {

const std::string& Table::uuid() const { return metadata_->table_uuid; }

const std::shared_ptr<Schema>& Table::schema() const {
if (!schema_) {
const static std::shared_ptr<Schema> kEmptySchema =
std::make_shared<Schema>(std::vector<SchemaField>{});
auto schema = metadata_->Schema();
if (schema.has_value()) {
schema_ = schema.value();
} else {
schema_ = kEmptySchema;
}
}
return schema_;
}

const std::unordered_map<int32_t, std::shared_ptr<Schema>>& Table::schemas() const {
std::call_once(init_schemas_once_, [this]() {
for (const auto& schema : metadata_->schemas) {
if (schema->schema_id()) {
schemas_map_.emplace(schema->schema_id().value(), schema);
}
}
});
return schemas_map_;
}

const std::shared_ptr<PartitionSpec>& Table::spec() const {
std::call_once(init_partition_spec_once_, [this]() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to follow similar implementation as Table::schema(). In this function, we can cache and return PartitionSpec::Unpartitioned() when missing.

auto partition_spec = metadata_->PartitionSpec();
if (partition_spec.has_value()) {
partition_spec_ = partition_spec.value();
}
});
return partition_spec_;
}

const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& Table::specs() const {
std::call_once(init_partition_specs_once_, [this]() {
for (const auto& spec : metadata_->partition_specs) {
partition_spec_map_[spec->spec_id()] = spec;
}
});
return partition_spec_map_;
}

const std::shared_ptr<SortOrder>& Table::sort_order() const {
std::call_once(init_sort_order_once_, [this]() {
auto sort_order = metadata_->SortOrder();
if (sort_order.has_value()) {
sort_order_ = sort_order.value();
}
});
return sort_order_;
}

const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& Table::sort_orders()
const {
std::call_once(init_sort_orders_once_, [this]() {
for (const auto& order : metadata_->sort_orders) {
sort_orders_map_[order->order_id()] = order;
}
});
return sort_orders_map_;
}

const std::unordered_map<std::string, std::string>& Table::properties() const {
return metadata_->properties;
}

const std::string& Table::location() const { return metadata_->location; }

std::shared_ptr<Snapshot> Table::current_snapshot() const {
std::call_once(init_snapshot_once_, [this]() {
auto snapshot = metadata_->Snapshot();
if (snapshot.has_value()) {
current_snapshot_ = snapshot.value();
}
});
return current_snapshot_;
}

std::shared_ptr<Snapshot> Table::SnapshotById(int64_t snapshot_id) const {
auto iter = std::ranges::find_if(metadata_->snapshots,
[this, &snapshot_id](const auto& snapshot) {
return snapshot->snapshot_id == snapshot_id;
});
if (iter == metadata_->snapshots.end()) {
return nullptr;
}
return *iter;
}

const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {
return metadata_->snapshots;
}

const std::vector<SnapshotLogEntry>& Table::history() const {
return metadata_->snapshot_log;
}

} // namespace iceberg
100 changes: 60 additions & 40 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

#pragma once

#include <memory>
#include <mutex>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any usage of mutex, what's the intent here?

#include <string>
#include <unordered_map>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -35,77 +36,96 @@ class ICEBERG_EXPORT Table {
public:
virtual ~Table() = default;

/// \brief Return the full name for this table
virtual const std::string& name() const = 0;
/// \brief Construct a table.
/// \param[in] identifier The identifier of the table.
/// \param[in] metadata The metadata for the table.
/// \param[in] metadata_location The location of the table metadata file.
/// \param[in] io The FileIO to read and write table data and metadata files.
/// \param[in] catalog The catalog that this table belongs to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment to say that the table is read-only if catalog is nullptr?

Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
std::shared_ptr<Catalog> catalog)
: identifier_(std::move(identifier)),
metadata_(std::move(metadata)),
metadata_location_(std::move(metadata_location)),
io_(std::move(io)),
catalog_(std::move(catalog)) {};

/// \brief Return the identifier of this table
const TableIdentifier& name() const { return identifier_; }

/// \brief Returns the UUID of the table
virtual const std::string& uuid() const = 0;
const std::string& uuid() const;

/// \brief Refresh the current table metadata
virtual Status Refresh() = 0;

/// \brief Return the schema for this table
virtual const std::shared_ptr<Schema>& schema() const = 0;
/// \brief Return the schema for this table, return empty schema if not found
const std::shared_ptr<Schema>& schema() const;

/// \brief Return a map of schema for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<Schema>>& schemas() const = 0;
const std::unordered_map<int32_t, std::shared_ptr<Schema>>& schemas() const;

/// \brief Return the partition spec for this table
virtual const std::shared_ptr<PartitionSpec>& spec() const = 0;
/// \brief Return the partition spec for this table, return null if default spec is not
/// found
const std::shared_ptr<PartitionSpec>& spec() const;

/// \brief Return a map of partition specs for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs()
const = 0;
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs() const;

/// \brief Return the sort order for this table
virtual const std::shared_ptr<SortOrder>& sort_order() const = 0;
/// \brief Return the sort order for this table, return null if default sort order is
/// not found
const std::shared_ptr<SortOrder>& sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& sort_orders()
const = 0;
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& sort_orders() const;

/// \brief Return a map of string properties for this table
virtual const std::unordered_map<std::string, std::string>& properties() const = 0;
const std::unordered_map<std::string, std::string>& properties() const;

/// \brief Return the table's base location
virtual const std::string& location() const = 0;
const std::string& location() const;

/// \brief Return the table's current snapshot
virtual const std::shared_ptr<Snapshot>& current_snapshot() const = 0;
/// \brief Return the table's current snapshot, return null if not found
std::shared_ptr<Snapshot> current_snapshot() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<Snapshot> current_snapshot() const;
const std::shared_ptr<Snapshot>& current_snapshot() const;

Every table should have at least one snapshot. I think we can safely return a reference.


/// \brief Get the snapshot of this table with the given id, or null if there is no
/// matching snapshot
///
/// \param snapshot_id the ID of the snapshot to get
/// \return the Snapshot with the given id
virtual Result<std::shared_ptr<Snapshot>> snapshot(int64_t snapshot_id) const = 0;
std::shared_ptr<Snapshot> SnapshotById(int64_t snapshot_id) const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What not returning Result<std::shared_ptr<Snapshot>>? It might return NotFound IMO.


/// \brief Get the snapshots of this table
virtual const std::vector<std::shared_ptr<Snapshot>>& snapshots() const = 0;
const std::vector<std::shared_ptr<Snapshot>>& snapshots() const;

/// \brief Get the snapshot history of this table
///
/// \return a vector of history entries
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;
const std::vector<SnapshotLogEntry>& history() const;

/// \brief Create a new table scan for this table
///
/// Once a table scan is created, it can be refined to project columns and filter data.
virtual std::unique_ptr<TableScan> NewScan() const = 0;
private:
const TableIdentifier identifier_;
const std::shared_ptr<TableMetadata> metadata_;
const std::string metadata_location_;
std::shared_ptr<FileIO> io_;
std::shared_ptr<Catalog> catalog_;

mutable std::shared_ptr<Schema> schema_;
mutable std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_map_;

/// \brief Create a new append API to add files to this table and commit
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
mutable std::shared_ptr<PartitionSpec> partition_spec_;
mutable std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> partition_spec_map_;

/// \brief Create a new transaction API to commit multiple table operations at once
virtual std::unique_ptr<Transaction> NewTransaction() = 0;
mutable std::shared_ptr<SortOrder> sort_order_;
mutable std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_map_;

/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
/// IO-less design in the core library.
// /// \brief Returns a FileIO to read and write table data and metadata files
// virtual std::shared_ptr<FileIO> io() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep io()?

mutable std::shared_ptr<Snapshot> current_snapshot_;

/// \brief Returns a LocationProvider to provide locations for new data files
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
// once_flags
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a main shortcoming of these once flags is that if later we add a new Table::Refresh() function to be in sync with the latest metadata, these flags should also be cleared. I would suggest to remove these flags and solely rely on whether the shared_ptr is null to decide if it is initialized.

mutable std::once_flag init_schemas_once_;
mutable std::once_flag init_partition_spec_once_;
mutable std::once_flag init_partition_specs_once_;
mutable std::once_flag init_sort_order_once_;
mutable std::once_flag init_sort_orders_once_;
mutable std::once_flag init_snapshot_once_;
};

} // namespace iceberg
34 changes: 29 additions & 5 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,24 @@ std::string ToString(const MetadataLogEntry& entry) {
}

Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
auto iter = std::ranges::find_if(schemas, [this](const auto& schema) {
return schema->schema_id() == current_schema_id;
std::call_once(init_schema_once, [this]() {
auto iter = std::ranges::find_if(schemas, [this](const auto& schema) {
return schema->schema_id() == current_schema_id;
});
if (iter != schemas.end()) {
schema = *iter;
}

// compatible with V1 table schema
if (!schema && schemas.size() == 1UL) {
schema = schemas.front();
}
});
if (iter == schemas.end()) {
return NotFound("Current schema is not found");

if (!schema) {
return NotFound("Current schema is not defined for this table");
}
return *iter;
return schema;
}

Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
Expand All @@ -76,6 +87,19 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
return *iter;
}

Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
if (current_snapshot_id == Snapshot::kInvalidSnapshotId) {
return NotFound("Current snapshot is not defined for this table");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (current_snapshot_id == Snapshot::kInvalidSnapshotId) {
return NotFound("Current snapshot is not defined for this table");
}

We don't need this special check because kInvalidSnapshotId can result in NotFound too.

auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
return snapshot->snapshot_id == current_snapshot_id;
});
if (iter == snapshots.end()) {
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
}
return *iter;
}

namespace {

template <typename T>
Expand Down
Loading
Loading