|
33 | 33 |
|
34 | 34 | namespace iceberg { |
35 | 35 |
|
| 36 | +namespace { |
| 37 | +/// \brief Use indexed data structures for efficient lookups |
| 38 | +struct DeleteFileIndex { |
| 39 | + /// \brief Index by sequence number for quick filtering |
| 40 | + std::multimap<int64_t, ManifestEntry*> sequence_index; |
| 41 | + |
| 42 | + /// \brief Build the index from a list of manifest entries. |
| 43 | + void BuildIndex(const std::vector<std::unique_ptr<ManifestEntry>>& entries) { |
| 44 | + sequence_index.clear(); |
| 45 | + |
| 46 | + for (const auto& entry : entries) { |
| 47 | + const int64_t seq_num = |
| 48 | + entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); |
| 49 | + sequence_index.emplace(seq_num, entry.get()); |
| 50 | + } |
| 51 | + } |
| 52 | + |
| 53 | + /// \brief Find delete files that match the sequence number of a data entry. |
| 54 | + std::vector<ManifestEntry*> FindRelevantEntries(const ManifestEntry& data_entry) const { |
| 55 | + std::vector<ManifestEntry*> relevant_deletes; |
| 56 | + |
| 57 | + // Use lower_bound for efficient range search |
| 58 | + auto data_sequence_number = |
| 59 | + data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); |
| 60 | + for (auto it = sequence_index.lower_bound(data_sequence_number); |
| 61 | + it != sequence_index.end(); ++it) { |
| 62 | + // Additional filtering logic here |
| 63 | + relevant_deletes.push_back(it->second); |
| 64 | + } |
| 65 | + |
| 66 | + return relevant_deletes; |
| 67 | + } |
| 68 | +}; |
| 69 | + |
| 70 | +/// \brief Get matched delete files for a given data entry. |
| 71 | +std::vector<std::shared_ptr<DataFile>> GetMatchedDeletes( |
| 72 | + const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { |
| 73 | + const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); |
| 74 | + std::vector<std::shared_ptr<DataFile>> matched_deletes; |
| 75 | + if (relevant_entries.empty()) { |
| 76 | + return matched_deletes; |
| 77 | + } |
| 78 | + |
| 79 | + matched_deletes.reserve(relevant_entries.size()); |
| 80 | + for (const auto& delete_entry : relevant_entries) { |
| 81 | + // TODO(gty404): check if the delete entry contains the data entry's file path |
| 82 | + matched_deletes.emplace_back(delete_entry->data_file); |
| 83 | + } |
| 84 | + return matched_deletes; |
| 85 | +} |
| 86 | +} // namespace |
| 87 | + |
36 | 88 | // implement FileScanTask |
37 | 89 | FileScanTask::FileScanTask(std::shared_ptr<DataFile> file, |
38 | 90 | std::vector<std::shared_ptr<DataFile>> delete_files, |
@@ -122,43 +174,46 @@ TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) { |
122 | 174 |
|
123 | 175 | Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() { |
124 | 176 | if (snapshot_id_) { |
125 | | - ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.snapshot(*snapshot_id_)); |
| 177 | + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.SnapshotById(*snapshot_id_)); |
126 | 178 | } else { |
127 | | - context_.snapshot = table_.current_snapshot(); |
| 179 | + ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, table_.current_snapshot()); |
128 | 180 | } |
129 | 181 | if (context_.snapshot == nullptr) { |
130 | | - return InvalidArgument("No snapshot found for table {}", table_.name()); |
| 182 | + return InvalidArgument("No snapshot found for table {}", table_.name().name); |
131 | 183 | } |
132 | 184 |
|
133 | 185 | if (!context_.projected_schema) { |
134 | 186 | std::shared_ptr<Schema> schema; |
135 | 187 | const auto& snapshot = context_.snapshot; |
136 | 188 | if (snapshot->schema_id) { |
137 | | - const auto& schemas = table_.schemas(); |
| 189 | + const auto& schemas = *table_.schemas(); |
138 | 190 | if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) { |
139 | 191 | schema = it->second; |
140 | 192 | } else { |
141 | 193 | return InvalidArgument("Schema {} in snapshot {} is not found", |
142 | 194 | *snapshot->schema_id, snapshot->snapshot_id); |
143 | 195 | } |
144 | 196 | } else { |
145 | | - schema = table_.schema(); |
| 197 | + ICEBERG_ASSIGN_OR_RAISE(schema, table_.schema()); |
146 | 198 | } |
147 | 199 |
|
148 | | - // TODO(gty404): collect touched columns from filter expression |
149 | | - std::vector<SchemaField> projected_fields; |
150 | | - projected_fields.reserve(column_names_.size()); |
151 | | - for (const auto& column_name : column_names_) { |
152 | | - // TODO(gty404): support case-insensitive column names |
153 | | - auto field_opt = schema->GetFieldByName(column_name); |
154 | | - if (!field_opt) { |
155 | | - return InvalidArgument("Column {} not found in schema", column_name); |
| 200 | + if (column_names_.empty()) { |
| 201 | + context_.projected_schema = schema; |
| 202 | + } else { |
| 203 | + // TODO(gty404): collect touched columns from filter expression |
| 204 | + std::vector<SchemaField> projected_fields; |
| 205 | + projected_fields.reserve(column_names_.size()); |
| 206 | + for (const auto& column_name : column_names_) { |
| 207 | + // TODO(gty404): support case-insensitive column names |
| 208 | + auto field_opt = schema->GetFieldByName(column_name); |
| 209 | + if (!field_opt) { |
| 210 | + return InvalidArgument("Column {} not found in schema", column_name); |
| 211 | + } |
| 212 | + projected_fields.emplace_back(field_opt.value().get()); |
156 | 213 | } |
157 | | - projected_fields.emplace_back(field_opt.value().get()); |
| 214 | + context_.projected_schema = |
| 215 | + std::make_shared<Schema>(std::move(projected_fields), schema->schema_id()); |
158 | 216 | } |
159 | | - |
160 | | - context_.projected_schema = |
161 | | - std::make_shared<Schema>(std::move(projected_fields), schema->schema_id()); |
162 | 217 | } |
163 | 218 |
|
164 | 219 | return std::make_unique<DataScan>(std::move(context_), table_.io()); |
@@ -227,47 +282,4 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataScan::PlanFiles() const { |
227 | 282 | return tasks; |
228 | 283 | } |
229 | 284 |
|
230 | | -void DataScan::DeleteFileIndex::BuildIndex( |
231 | | - const std::vector<std::unique_ptr<ManifestEntry>>& entries) { |
232 | | - sequence_index.clear(); |
233 | | - |
234 | | - for (const auto& entry : entries) { |
235 | | - const int64_t seq_num = |
236 | | - entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber); |
237 | | - sequence_index.emplace(seq_num, entry.get()); |
238 | | - } |
239 | | -} |
240 | | - |
241 | | -std::vector<ManifestEntry*> DataScan::DeleteFileIndex::FindRelevantEntries( |
242 | | - const ManifestEntry& data_entry) const { |
243 | | - std::vector<ManifestEntry*> relevant_deletes; |
244 | | - |
245 | | - // Use lower_bound for efficient range search |
246 | | - auto data_sequence_number = |
247 | | - data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber); |
248 | | - for (auto it = sequence_index.lower_bound(data_sequence_number); |
249 | | - it != sequence_index.end(); ++it) { |
250 | | - // Additional filtering logic here |
251 | | - relevant_deletes.push_back(it->second); |
252 | | - } |
253 | | - |
254 | | - return relevant_deletes; |
255 | | -} |
256 | | - |
257 | | -std::vector<std::shared_ptr<DataFile>> DataScan::GetMatchedDeletes( |
258 | | - const ManifestEntry& data_entry, const DeleteFileIndex& delete_file_index) { |
259 | | - const auto relevant_entries = delete_file_index.FindRelevantEntries(data_entry); |
260 | | - std::vector<std::shared_ptr<DataFile>> matched_deletes; |
261 | | - if (relevant_entries.empty()) { |
262 | | - return matched_deletes; |
263 | | - } |
264 | | - |
265 | | - matched_deletes.reserve(relevant_entries.size()); |
266 | | - for (const auto& delete_entry : relevant_entries) { |
267 | | - // TODO(gty404): check if the delete entry contains the data entry's file path |
268 | | - matched_deletes.emplace_back(delete_entry->data_file); |
269 | | - } |
270 | | - return matched_deletes; |
271 | | -} |
272 | | - |
273 | 285 | } // namespace iceberg |
0 commit comments