Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(ICEBERG_SOURCES
sort_field.cc
sort_order.cc
statistics_file.cc
table.cc
table_metadata.cc
transform.cc
transform_function.cc
Expand Down
110 changes: 110 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table.h"

#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"

namespace iceberg {

const std::string& Table::uuid() const { return metadata_->table_uuid; }

Result<std::shared_ptr<Schema>> Table::schema() const { return metadata_->Schema(); }

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>&
Table::schemas() const {
if (!schemas_map_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there can be race conditions here and the same pattern below, no?

schemas_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<Schema>>>();
for (const auto& schema : metadata_->schemas) {
if (schema->schema_id()) {
schemas_map_->emplace(schema->schema_id().value(), schema);
}
}
}
return schemas_map_;
}

Result<std::shared_ptr<PartitionSpec>> Table::spec() const {
return metadata_->PartitionSpec();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
Table::specs() const {
if (!partition_spec_map_) {
partition_spec_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>();
for (const auto& spec : metadata_->partition_specs) {
partition_spec_map_->emplace(spec->spec_id(), spec);
}
}
return partition_spec_map_;
}

Result<std::shared_ptr<SortOrder>> Table::sort_order() const {
return metadata_->SortOrder();
}

const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
Table::sort_orders() const {
if (!sort_orders_map_) {
sort_orders_map_ =
std::make_shared<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>();
for (const auto& order : metadata_->sort_orders) {
sort_orders_map_->emplace(order->order_id(), order);
}
}
return sort_orders_map_;
}

const std::unordered_map<std::string, std::string>& Table::properties() const {
return metadata_->properties;
}

const std::string& Table::location() const { return metadata_->location; }

Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
return metadata_->Snapshot();
}

Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
auto iter = std::ranges::find_if(metadata_->snapshots,
[this, &snapshot_id](const auto& snapshot) {
return snapshot->snapshot_id == snapshot_id;
});
if (iter == metadata_->snapshots.end()) {
return NotFound("Snapshot with ID {} is not found", snapshot_id);
}
return *iter;
}

const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {
return metadata_->snapshots;
}

const std::vector<SnapshotLogEntry>& Table::history() const {
return metadata_->snapshot_log;
}

const std::shared_ptr<FileIO>& Table::io() const { return io_; }

} // namespace iceberg
111 changes: 63 additions & 48 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

#pragma once

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_identifier.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand All @@ -35,77 +35,92 @@ class ICEBERG_EXPORT Table {
public:
virtual ~Table() = default;

/// \brief Return the full name for this table
virtual const std::string& name() const = 0;
/// \brief Construct a table.
/// \param[in] identifier The identifier of the table.
/// \param[in] metadata The metadata for the table.
/// \param[in] metadata_location The location of the table metadata file.
/// \param[in] io The FileIO to read and write table data and metadata files.
/// \param[in] catalog The catalog that this table belongs to. If null, the table will
/// be read-only.
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
std::shared_ptr<Catalog> catalog)
: identifier_(std::move(identifier)),
metadata_(std::move(metadata)),
metadata_location_(std::move(metadata_location)),
io_(std::move(io)),
catalog_(std::move(catalog)) {};

/// \brief Return the identifier of this table
const TableIdentifier& name() const { return identifier_; }

/// \brief Returns the UUID of the table
virtual const std::string& uuid() const = 0;
const std::string& uuid() const;

/// \brief Refresh the current table metadata
virtual Status Refresh() = 0;

/// \brief Return the schema for this table
virtual const std::shared_ptr<Schema>& schema() const = 0;
/// \brief Return the schema for this table, return NotFoundError if not found
Result<std::shared_ptr<Schema>> schema() const;

/// \brief Return a map of schema for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<Schema>>& schemas() const = 0;
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>& schemas()
const;

/// \brief Return the partition spec for this table
virtual const std::shared_ptr<PartitionSpec>& spec() const = 0;
/// \brief Return the partition spec for this table, return NotFoundError if not found
Result<std::shared_ptr<PartitionSpec>> spec() const;

/// \brief Return a map of partition specs for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& specs()
const = 0;
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>&
specs() const;

/// \brief Return the sort order for this table
virtual const std::shared_ptr<SortOrder>& sort_order() const = 0;
/// \brief Return the sort order for this table, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> sort_order() const;

/// \brief Return a map of sort order IDs to sort orders for this table
virtual const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>& sort_orders()
const = 0;
/// \note This method is **not** thread-safe in the current implementation.
const std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>&
sort_orders() const;

/// \brief Return a map of string properties for this table
virtual const std::unordered_map<std::string, std::string>& properties() const = 0;
const std::unordered_map<std::string, std::string>& properties() const;

/// \brief Return the table's base location
virtual const std::string& location() const = 0;
const std::string& location() const;

/// \brief Return the table's current snapshot
virtual const std::shared_ptr<Snapshot>& current_snapshot() const = 0;
/// \brief Return the table's current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> current_snapshot() const;

/// \brief Get the snapshot of this table with the given id, or null if there is no
/// matching snapshot
/// \brief Get the snapshot of this table with the given id
///
/// \param snapshot_id the ID of the snapshot to get
/// \return the Snapshot with the given id
virtual Result<std::shared_ptr<Snapshot>> snapshot(int64_t snapshot_id) const = 0;
/// \return the Snapshot with the given id, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> SnapshotById(int64_t snapshot_id) const;

/// \brief Get the snapshots of this table
virtual const std::vector<std::shared_ptr<Snapshot>>& snapshots() const = 0;
const std::vector<std::shared_ptr<Snapshot>>& snapshots() const;

/// \brief Get the snapshot history of this table
///
/// \return a vector of history entries
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;

/// \brief Create a new table scan for this table
///
/// Once a table scan is created, it can be refined to project columns and filter data.
virtual std::unique_ptr<TableScan> NewScan() const = 0;

/// \brief Create a new append API to add files to this table and commit
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;

/// \brief Create a new transaction API to commit multiple table operations at once
virtual std::unique_ptr<Transaction> NewTransaction() = 0;

/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
/// IO-less design in the core library.
// /// \brief Returns a FileIO to read and write table data and metadata files
// virtual std::shared_ptr<FileIO> io() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep io()?


/// \brief Returns a LocationProvider to provide locations for new data files
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
const std::vector<SnapshotLogEntry>& history() const;

/// \brief Returns a FileIO to read and write table data and metadata files
const std::shared_ptr<FileIO>& io() const;

private:
const TableIdentifier identifier_;
std::shared_ptr<TableMetadata> metadata_;
const std::string metadata_location_;
std::shared_ptr<FileIO> io_;
std::shared_ptr<Catalog> catalog_;

// Cache lazy-initialized maps.
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<Schema>>>
schemas_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>
partition_spec_map_;
mutable std::shared_ptr<std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>
sort_orders_map_;
};

} // namespace iceberg
10 changes: 10 additions & 0 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
return *iter;
}

Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
return snapshot->snapshot_id == current_snapshot_id;
});
if (iter == snapshots.end()) {
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
}
return *iter;
}

namespace {

template <typename T>
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ struct ICEBERG_EXPORT TableMetadata {
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
Result<std::shared_ptr<SortOrder>> SortOrder() const;
/// \brief Get the current snapshot, return NotFoundError if not found
Result<std::shared_ptr<Snapshot>> Snapshot() const;

friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);

Expand Down
5 changes: 4 additions & 1 deletion src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class TransformFunction;
struct PartitionStatisticsFile;
struct Snapshot;
struct SnapshotRef;

struct MetadataLogEntry;
struct SnapshotLogEntry;

struct StatisticsFile;
struct TableMetadata;

Expand All @@ -113,7 +117,6 @@ enum class TransformType;
/// TODO: Forward declarations below are not added yet.
/// ----------------------------------------------------------------------------

class HistoryEntry;
class StructLike;

class MetadataUpdate;
Expand Down
11 changes: 9 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME catalog_test COMMAND catalog_test)

add_executable(table_test)
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
schema_json_test.cc)
target_link_libraries(table_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME table_test COMMAND table_test)

add_executable(expression_test)
target_sources(expression_test PRIVATE expression_test.cc)
target_link_libraries(expression_test PRIVATE iceberg_static GTest::gtest_main
Expand All @@ -57,8 +64,8 @@ add_test(NAME expression_test COMMAND expression_test)

add_executable(json_serde_test)
target_include_directories(json_serde_test PRIVATE "${CMAKE_BINARY_DIR}")
target_sources(json_serde_test PRIVATE json_internal_test.cc metadata_serde_test.cc
schema_json_test.cc)
target_sources(json_serde_test PRIVATE test_common.cc json_internal_test.cc
metadata_serde_test.cc schema_json_test.cc)
target_link_libraries(json_serde_test PRIVATE iceberg_static GTest::gtest_main
GTest::gmock)
add_test(NAME json_serde_test COMMAND json_serde_test)
Expand Down
40 changes: 19 additions & 21 deletions test/metadata_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,25 @@ class MetadataIOTest : public TempFileTestBase {
/*optional=*/false);
auto schema = std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);

TableMetadata metadata{
.format_version = 1,
.table_uuid = "1234567890",
.location = "s3://bucket/path",
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
.default_spec_id = 0,
.last_partition_id = 0,
.properties = {{"key", "value"}},
.current_snapshot_id = 3051729675574597004,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
.default_sort_order_id = 0,
.next_row_id = 0};
return metadata;
return TableMetadata{.format_version = 1,
.table_uuid = "1234567890",
.location = "s3://bucket/path",
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
.default_spec_id = 0,
.last_partition_id = 0,
.properties = {{"key", "value"}},
.current_snapshot_id = 3051729675574597004,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 3051729675574597004,
.sequence_number = 0,
.timestamp_ms = TimePointMsFromUnixMs(1515100955770).value(),
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
.default_sort_order_id = 0,
.next_row_id = 0};
}

std::shared_ptr<iceberg::FileIO> io_;
Expand Down
Loading
Loading