Skip to content

Commit b07c0f9

Browse files
authored
refactor: Add SchemaById and SnapshotById to TableMetadata (#144)
1 parent 7c1ea8a commit b07c0f9

File tree

6 files changed

+52
-40
lines changed

6 files changed

+52
-40
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,7 @@ Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
9090
}
9191

9292
Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
93-
auto iter = std::ranges::find_if(metadata_->snapshots,
94-
[this, &snapshot_id](const auto& snapshot) {
95-
return snapshot->snapshot_id == snapshot_id;
96-
});
97-
if (iter == metadata_->snapshots.end()) {
98-
return NotFound("Snapshot with ID {} is not found", snapshot_id);
99-
}
100-
return *iter;
93+
return metadata_->SnapshotById(snapshot_id);
10194
}
10295

10396
const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {

src/iceberg/table_metadata.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,16 @@ std::string ToString(const MetadataLogEntry& entry) {
4747
}
4848

4949
Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
50-
auto iter = std::ranges::find_if(schemas, [this](const auto& schema) {
51-
return schema->schema_id() == current_schema_id;
50+
return SchemaById(current_schema_id);
51+
}
52+
53+
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
54+
const std::optional<int32_t>& schema_id) const {
55+
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
56+
return schema->schema_id() == schema_id;
5257
});
5358
if (iter == schemas.end()) {
54-
return NotFound("Current schema is not found");
59+
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
5560
}
5661
return *iter;
5762
}
@@ -77,11 +82,15 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
7782
}
7883

7984
Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
80-
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
81-
return snapshot->snapshot_id == current_snapshot_id;
85+
return SnapshotById(current_snapshot_id);
86+
}
87+
88+
Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t snapshot_id) const {
89+
auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) {
90+
return snapshot->snapshot_id == snapshot_id;
8291
});
8392
if (iter == snapshots.end()) {
84-
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
93+
return NotFound("Snapshot with ID {} is not found", snapshot_id);
8594
}
8695
return *iter;
8796
}

src/iceberg/table_metadata.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,17 @@ struct ICEBERG_EXPORT TableMetadata {
123123

124124
/// \brief Get the current schema, return NotFoundError if not found
125125
Result<std::shared_ptr<Schema>> Schema() const;
126+
/// \brief Get the current schema by ID, return NotFoundError if not found
127+
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
128+
const std::optional<int32_t>& schema_id) const;
126129
/// \brief Get the current partition spec, return NotFoundError if not found
127130
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
128131
/// \brief Get the current sort order, return NotFoundError if not found
129132
Result<std::shared_ptr<SortOrder>> SortOrder() const;
130133
/// \brief Get the current snapshot, return NotFoundError if not found
131134
Result<std::shared_ptr<Snapshot>> Snapshot() const;
135+
/// \brief Get the snapshot of this table with the given id
136+
Result<std::shared_ptr<iceberg::Snapshot>> SnapshotById(int64_t snapshot_id) const;
132137

133138
friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);
134139
};

src/iceberg/table_scan.cc

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
namespace iceberg {
3535

3636
// implement FileScanTask
37-
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file)
38-
: data_file_(std::move(file)) {}
37+
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
38+
: data_file_(std::move(data_file)) {}
3939

4040
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
4141

@@ -94,32 +94,13 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
9494
return InvalidArgument("No snapshot ID specified for table {}",
9595
table_metadata->table_uuid);
9696
}
97-
auto iter = std::ranges::find_if(
98-
table_metadata->snapshots,
99-
[id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; });
100-
if (iter == table_metadata->snapshots.end()) {
101-
return NotFound("Snapshot with ID {} is not found", *snapshot_id);
102-
}
103-
context_.snapshot = *iter;
97+
ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id));
10498

10599
if (!context_.projected_schema) {
106100
const auto& snapshot = context_.snapshot;
107101
auto schema_id =
108102
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
109-
if (!schema_id) {
110-
return InvalidArgument("No schema ID found in snapshot {} for table {}",
111-
snapshot->snapshot_id, table_metadata->table_uuid);
112-
}
113-
114-
const auto& schemas = table_metadata->schemas;
115-
const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) {
116-
return schema->schema_id() == id;
117-
});
118-
if (it == schemas.end()) {
119-
return InvalidArgument("Schema {} in snapshot {} is not found",
120-
*snapshot->schema_id, snapshot->snapshot_id);
121-
}
122-
const auto& schema = *it;
103+
ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id));
123104

124105
if (column_names_.empty()) {
125106
context_.projected_schema = schema;
@@ -139,6 +120,9 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
139120
context_.projected_schema =
140121
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
141122
}
123+
} else if (!column_names_.empty()) {
124+
return InvalidArgument(
125+
"Cannot specify column names when a projected schema is provided");
142126
}
143127

144128
return std::make_unique<DataTableScan>(std::move(context_), file_io_);

src/iceberg/table_scan.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ICEBERG_EXPORT ScanTask {
4545
/// \brief Task representing a data file and its corresponding delete files.
4646
class ICEBERG_EXPORT FileScanTask : public ScanTask {
4747
public:
48-
explicit FileScanTask(std::shared_ptr<DataFile> file);
48+
explicit FileScanTask(std::shared_ptr<DataFile> data_file);
4949

5050
/// \brief The data file that should be read by this scan task.
5151
const std::shared_ptr<DataFile>& data_file() const;

test/metadata_serde_test.cc

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,23 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) {
106106
/*optional=*/false);
107107
schema_fields.emplace_back(/*field_id=*/3, "z", iceberg::int64(),
108108
/*optional=*/false);
109-
auto expected_schema =
110-
std::make_shared<Schema>(std::move(schema_fields), /*schema_id=*/1);
109+
auto expected_schema = std::make_shared<Schema>(schema_fields, /*schema_id=*/1);
111110
auto schema = metadata->Schema();
112111
ASSERT_TRUE(schema.has_value());
113112
EXPECT_EQ(*(schema.value().get()), *expected_schema);
114113

114+
// schema with ID 1
115+
auto schema_v1 = metadata->SchemaById(1);
116+
ASSERT_TRUE(schema_v1.has_value());
117+
EXPECT_EQ(*(schema_v1.value().get()), *expected_schema);
118+
119+
// schema with ID 0
120+
auto expected_schema_v0 = std::make_shared<Schema>(
121+
std::vector<SchemaField>{schema_fields.at(0)}, /*schema_id=*/0);
122+
auto schema_v0 = metadata->SchemaById(0);
123+
ASSERT_TRUE(schema_v0.has_value());
124+
EXPECT_EQ(*(schema_v0.value().get()), *expected_schema_v0);
125+
115126
// Compare partition spec
116127
EXPECT_EQ(metadata->default_spec_id, 0);
117128
std::vector<PartitionField> partition_fields;
@@ -165,6 +176,16 @@ TEST_F(MetadataSerdeTest, DeserializeV2Valid) {
165176
EXPECT_EQ(*metadata->snapshots[i], expected_snapshots[i]);
166177
}
167178

179+
// snapshot with ID 3051729675574597004
180+
auto snapshot_v0 = metadata->SnapshotById(3051729675574597004);
181+
ASSERT_TRUE(snapshot_v0.has_value());
182+
EXPECT_EQ(*snapshot_v0.value(), expected_snapshots[0]);
183+
184+
// snapshot with ID 3055729675574597004
185+
auto snapshot_v1 = metadata->SnapshotById(3055729675574597004);
186+
ASSERT_TRUE(snapshot_v1.has_value());
187+
EXPECT_EQ(*snapshot_v1.value(), expected_snapshots[1]);
188+
168189
// Compare snapshot logs
169190
std::vector<SnapshotLogEntry> expected_snapshot_log{
170191
{

0 commit comments

Comments
 (0)