Skip to content

Commit 828ad81

Browse files
committed
refactor: Add SchemaById and SnapshotById to TableMetadata
1 parent be514fc commit 828ad81

File tree

5 files changed

+29
-38
lines changed

5 files changed

+29
-38
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,7 @@ Result<std::shared_ptr<Snapshot>> Table::current_snapshot() const {
9090
}
9191

9292
Result<std::shared_ptr<Snapshot>> Table::SnapshotById(int64_t snapshot_id) const {
93-
auto iter = std::ranges::find_if(metadata_->snapshots,
94-
[this, &snapshot_id](const auto& snapshot) {
95-
return snapshot->snapshot_id == snapshot_id;
96-
});
97-
if (iter == metadata_->snapshots.end()) {
98-
return NotFound("Snapshot with ID {} is not found", snapshot_id);
99-
}
100-
return *iter;
93+
return metadata_->SnapshotById(snapshot_id);
10194
}
10295

10396
const std::vector<std::shared_ptr<Snapshot>>& Table::snapshots() const {

src/iceberg/table_metadata.cc

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,16 @@ std::string ToString(const MetadataLogEntry& entry) {
4747
}
4848

4949
Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
50-
auto iter = std::ranges::find_if(schemas, [this](const auto& schema) {
51-
return schema->schema_id() == current_schema_id;
50+
return SchemaById(current_schema_id);
51+
}
52+
53+
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
54+
const std::optional<int32_t>& schema_id) const {
55+
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
56+
return schema->schema_id() == schema_id;
5257
});
5358
if (iter == schemas.end()) {
54-
return NotFound("Current schema is not found");
59+
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
5560
}
5661
return *iter;
5762
}
@@ -77,11 +82,15 @@ Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
7782
}
7883

7984
Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
80-
auto iter = std::ranges::find_if(snapshots, [this](const auto& snapshot) {
81-
return snapshot->snapshot_id == current_snapshot_id;
85+
return SnapshotById(current_snapshot_id);
86+
}
87+
88+
Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t snapshot_id) const {
89+
auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto& snapshot) {
90+
return snapshot->snapshot_id == snapshot_id;
8291
});
8392
if (iter == snapshots.end()) {
84-
return NotFound("Current snapshot with ID {} is not found", current_snapshot_id);
93+
return NotFound("Snapshot with ID {} is not found", snapshot_id);
8594
}
8695
return *iter;
8796
}

src/iceberg/table_metadata.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,17 @@ struct ICEBERG_EXPORT TableMetadata {
123123

124124
/// \brief Get the current schema, return NotFoundError if not found
125125
Result<std::shared_ptr<Schema>> Schema() const;
126+
/// \brief Get the current schema by ID, return NotFoundError if not found
127+
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
128+
const std::optional<int32_t>& schema_id) const;
126129
/// \brief Get the current partition spec, return NotFoundError if not found
127130
Result<std::shared_ptr<PartitionSpec>> PartitionSpec() const;
128131
/// \brief Get the current sort order, return NotFoundError if not found
129132
Result<std::shared_ptr<SortOrder>> SortOrder() const;
130133
/// \brief Get the current snapshot, return NotFoundError if not found
131134
Result<std::shared_ptr<Snapshot>> Snapshot() const;
135+
/// \brief Get the snapshot of this table with the given id
136+
Result<std::shared_ptr<iceberg::Snapshot>> SnapshotById(int64_t snapshot_id) const;
132137

133138
friend bool operator==(const TableMetadata& lhs, const TableMetadata& rhs);
134139
};

src/iceberg/table_scan.cc

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
namespace iceberg {
3535

3636
// implement FileScanTask
37-
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file)
38-
: data_file_(std::move(file)) {}
37+
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
38+
: data_file_(std::move(data_file)) {}
3939

4040
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
4141

@@ -94,32 +94,13 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
9494
return InvalidArgument("No snapshot ID specified for table {}",
9595
table_metadata->table_uuid);
9696
}
97-
auto iter = std::ranges::find_if(
98-
table_metadata->snapshots,
99-
[id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; });
100-
if (iter == table_metadata->snapshots.end()) {
101-
return NotFound("Snapshot with ID {} is not found", *snapshot_id);
102-
}
103-
context_.snapshot = *iter;
97+
ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_metadata->SnapshotById(*snapshot_id));
10498

10599
if (!context_.projected_schema) {
106100
const auto& snapshot = context_.snapshot;
107101
auto schema_id =
108102
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
109-
if (!schema_id) {
110-
return InvalidArgument("No schema ID found in snapshot {} for table {}",
111-
snapshot->snapshot_id, table_metadata->table_uuid);
112-
}
113-
114-
const auto& schemas = table_metadata->schemas;
115-
const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) {
116-
return schema->schema_id() == id;
117-
});
118-
if (it == schemas.end()) {
119-
return InvalidArgument("Schema {} in snapshot {} is not found",
120-
*snapshot->schema_id, snapshot->snapshot_id);
121-
}
122-
const auto& schema = *it;
103+
ICEBERG_ASSIGN_OR_RAISE(auto schema, table_metadata->SchemaById(schema_id));
123104

124105
if (column_names_.empty()) {
125106
context_.projected_schema = schema;
@@ -139,6 +120,9 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
139120
context_.projected_schema =
140121
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
141122
}
123+
} else if (!column_names_.empty()) {
124+
return InvalidArgument(
125+
"Cannot specify column names when a projected schema is provided");
142126
}
143127

144128
return std::make_unique<DataTableScan>(std::move(context_), file_io_);

src/iceberg/table_scan.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ICEBERG_EXPORT ScanTask {
4545
/// \brief Task representing a data file and its corresponding delete files.
4646
class ICEBERG_EXPORT FileScanTask : public ScanTask {
4747
public:
48-
explicit FileScanTask(std::shared_ptr<DataFile> file);
48+
explicit FileScanTask(std::shared_ptr<DataFile> data_file);
4949

5050
/// \brief The data file that should be read by this scan task.
5151
const std::shared_ptr<DataFile>& data_file() const;

0 commit comments

Comments
 (0)