2828#include " iceberg/schema.h"
2929#include " iceberg/schema_field.h"
3030#include " iceberg/snapshot.h"
31- #include " iceberg/table.h"
3231#include " iceberg/table_metadata.h"
3332#include " iceberg/util/macros.h"
3433
3534namespace iceberg {
3635
3736namespace {
3837// / \brief Use indexed data structures for efficient lookups
39- struct DeleteFileIndex {
40- // / \brief Index by sequence number for quick filtering
41- std::multimap<int64_t , ManifestEntry*> sequence_index;
42-
38+ class DeleteFileIndex {
39+ public:
4340 // / \brief Build the index from a list of manifest entries.
44- void BuildIndex (const std::vector<std::unique_ptr<ManifestEntry>>& entries) {
45- sequence_index.clear ();
46-
41+ explicit DeleteFileIndex (const std::vector<std::unique_ptr<ManifestEntry>>& entries) {
4742 for (const auto & entry : entries) {
4843 const int64_t seq_num =
4944 entry->sequence_number .value_or (TableMetadata::kInitialSequenceNumber );
@@ -66,6 +61,10 @@ struct DeleteFileIndex {
6661
6762 return relevant_deletes;
6863 }
64+
65+ private:
66+ // / \brief Index by sequence number for quick filtering
67+ std::multimap<int64_t , ManifestEntry*> sequence_index;
6968};
7069
7170// / \brief Get matched delete files for a given data entry.
@@ -107,19 +106,19 @@ int64_t FileScanTask::start() const { return start_; }
107106
108107int64_t FileScanTask::length () const { return length_; }
109108
110- int64_t FileScanTask::size_bytes () const {
109+ int64_t FileScanTask::SizeBytes () const {
111110 int64_t sizeInBytes = length_;
112111 std::ranges::for_each (delete_files_, [&sizeInBytes](const auto & delete_file) {
113112 sizeInBytes += delete_file->file_size_in_bytes ;
114113 });
115114 return sizeInBytes;
116115}
117116
118- int32_t FileScanTask::files_count () const {
117+ int32_t FileScanTask::FilesCount () const {
119118 return static_cast <int32_t >(delete_files_.size () + 1 );
120119}
121120
122- int64_t FileScanTask::estimated_row_count () const {
121+ int64_t FileScanTask::EstimatedRowCount () const {
123122 if (data_file_->file_size_in_bytes == 0 ) {
124123 return 0 ;
125124 }
@@ -130,9 +129,9 @@ int64_t FileScanTask::estimated_row_count() const {
130129
131130const std::shared_ptr<Expression>& FileScanTask::residual () const { return residual_; }
132131
133- TableScanBuilder::TableScanBuilder (const Table& table ,
134- std::shared_ptr<TableMetadata> table_metadata )
135- : table_(table ) {
132+ TableScanBuilder::TableScanBuilder (std::shared_ptr<TableMetadata> table_metadata ,
133+ std::shared_ptr<FileIO> file_io )
134+ : file_io_(std::move(file_io) ) {
136135 context_.table_metadata = std::move (table_metadata);
137136}
138137
@@ -143,7 +142,7 @@ TableScanBuilder& TableScanBuilder::WithColumnNames(
143142 return *this ;
144143}
145144
146- TableScanBuilder& TableScanBuilder::WithSchema (std::shared_ptr<Schema> schema) {
145+ TableScanBuilder& TableScanBuilder::WithProjectedSchema (std::shared_ptr<Schema> schema) {
147146 context_.projected_schema = std::move (schema);
148147 return *this ;
149148}
@@ -174,29 +173,39 @@ TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
174173}
175174
176175Result<std::unique_ptr<TableScan>> TableScanBuilder::Build () {
177- if (snapshot_id_) {
178- ICEBERG_ASSIGN_OR_RAISE (context_.snapshot , table_.SnapshotById (*snapshot_id_));
179- } else {
180- ICEBERG_ASSIGN_OR_RAISE (context_.snapshot , table_.current_snapshot ());
176+ const auto & table_metadata = context_.table_metadata ;
177+ auto snapshot_id = snapshot_id_ ? snapshot_id_ : table_metadata->current_snapshot_id ;
178+ if (!snapshot_id) {
179+ return InvalidArgument (" No snapshot ID specified for table {}" ,
180+ table_metadata->table_uuid );
181181 }
182- if (context_.snapshot == nullptr ) {
183- return InvalidArgument (" No snapshot found for table {}" , table_.name ().name );
182+ auto iter = std::ranges::find_if (table_metadata->snapshots ,
183+ [&snapshot_id](const auto & snapshot) {
184+ return snapshot->snapshot_id == *snapshot_id;
185+ });
186+ if (iter == table_metadata->snapshots .end () || *iter == nullptr ) {
187+ return NotFound (" Snapshot with ID {} is not found" , *snapshot_id);
184188 }
189+ context_.snapshot = *iter;
185190
186191 if (!context_.projected_schema ) {
187- std::shared_ptr<Schema> schema;
188192 const auto & snapshot = context_.snapshot ;
189- if (snapshot->schema_id ) {
190- const auto & schemas = *table_.schemas ();
191- if (const auto it = schemas.find (*snapshot->schema_id ); it != schemas.end ()) {
192- schema = it->second ;
193- } else {
194- return InvalidArgument (" Schema {} in snapshot {} is not found" ,
195- *snapshot->schema_id , snapshot->snapshot_id );
196- }
197- } else {
198- ICEBERG_ASSIGN_OR_RAISE (schema, table_.schema ());
193+ auto schema_id =
194+ snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id ;
195+ if (!schema_id) {
196+ return InvalidArgument (" No schema ID found in snapshot {} for table {}" ,
197+ snapshot->snapshot_id , table_metadata->table_uuid );
198+ }
199+
200+ const auto & schemas = table_metadata->schemas ;
201+ const auto it = std::ranges::find_if (schemas, [&schema_id](const auto & schema) {
202+ return schema->schema_id () == *schema_id;
203+ });
204+ if (it == schemas.end ()) {
205+ return InvalidArgument (" Schema {} in snapshot {} is not found" ,
206+ *snapshot->schema_id , snapshot->snapshot_id );
199207 }
208+ auto schema = *it;
200209
201210 if (column_names_.empty ()) {
202211 context_.projected_schema = schema;
@@ -217,7 +226,7 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
217226 }
218227 }
219228
220- return std::make_unique<DataScan>(std::move (context_), table_. io () );
229+ return std::make_unique<DataScan>(std::move (context_), file_io_ );
221230}
222231
223232TableScan::TableScan (TableScanContext context, std::shared_ptr<FileIO> file_io)
@@ -267,8 +276,7 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataScan::PlanFiles() const {
267276 }
268277 }
269278
270- DeleteFileIndex delete_file_index;
271- delete_file_index.BuildIndex (positional_delete_entries);
279+ DeleteFileIndex delete_file_index (positional_delete_entries);
272280
273281 // TODO(gty404): build residual expression from filter
274282 std::shared_ptr<Expression> residual;
0 commit comments