Skip to content

Commit 85802e9

Browse files
committed
Abstract TableScan and ScanTask
1 parent 0f79c7c commit 85802e9

File tree

4 files changed

+282
-55
lines changed

4 files changed

+282
-55
lines changed

src/iceberg/snapshot.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ struct ICEBERG_EXPORT DataOperation {
222222
/// Snapshots are created by table operations.
223223
struct ICEBERG_EXPORT Snapshot {
224224
static constexpr int64_t kInvalidSnapshotId = -1;
225+
static constexpr int64_t kInitialSequenceNumber = 0;
225226

226227
/// A unique long ID.
227228
int64_t snapshot_id;

src/iceberg/table_scan.cc

Lines changed: 166 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,86 @@
3030

3131
namespace iceberg {
3232

33-
TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {}
33+
// implement FileScanTask
34+
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file,
35+
std::vector<std::shared_ptr<DataFile>> delete_files,
36+
int64_t start, int64_t length,
37+
std::shared_ptr<Expression> residual)
38+
: data_file_(std::move(file)),
39+
delete_files_(std::move(delete_files)),
40+
start_(start),
41+
length_(length),
42+
residual_(std::move(residual)) {}
43+
44+
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
45+
46+
const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files() const {
47+
return delete_files_;
48+
}
49+
50+
int64_t FileScanTask::start() const { return start_; }
51+
52+
int64_t FileScanTask::length() const { return length_; }
53+
54+
int64_t FileScanTask::size_bytes() const {
55+
int64_t sizeInBytes = length_;
56+
std::ranges::for_each(delete_files_, [&sizeInBytes](const auto& delete_file) {
57+
sizeInBytes += delete_file->file_size_in_bytes;
58+
});
59+
return sizeInBytes;
60+
}
61+
62+
int32_t FileScanTask::files_count() const {
63+
return static_cast<int32_t>(delete_files_.size() + 1);
64+
}
65+
66+
int64_t FileScanTask::estimated_row_count() const {
67+
const double scannedFileFraction =
68+
static_cast<double>(length_) / data_file_->file_size_in_bytes;
69+
return static_cast<int64_t>(scannedFileFraction * data_file_->record_count);
70+
}
71+
72+
const std::shared_ptr<Expression>& FileScanTask::residual() const { return residual_; }
73+
74+
TableScanBuilder::TableScanBuilder(const Table& table,
75+
std::shared_ptr<TableMetadata> table_metadata)
76+
: table_(table) {
77+
context_.table_metadata = std::move(table_metadata);
78+
}
3479

3580
TableScanBuilder& TableScanBuilder::WithColumnNames(
3681
std::vector<std::string> column_names) {
3782
column_names_ = std::move(column_names);
3883
return *this;
3984
}
4085

86+
TableScanBuilder& TableScanBuilder::WithSchema(std::shared_ptr<Schema> schema) {
87+
context_.projected_schema = std::move(schema);
88+
return *this;
89+
}
90+
4191
TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
4292
snapshot_id_ = snapshot_id;
4393
return *this;
4494
}
4595

4696
TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression> filter) {
47-
filter_ = std::move(filter);
97+
context_.filter = std::move(filter);
98+
return *this;
99+
}
100+
101+
TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) {
102+
context_.case_sensitive = case_sensitive;
103+
return *this;
104+
}
105+
106+
TableScanBuilder& TableScanBuilder::WithOption(std::string property, std::string value) {
107+
context_.options[std::move(property)] = std::move(value);
108+
return *this;
109+
}
110+
111+
TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
112+
context_.limit = limit;
48113
return *this;
49114
}
50115

@@ -58,59 +123,127 @@ Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
58123
if (snapshot == nullptr) {
59124
return InvalidArgument("No snapshot found for table {}", table_.name());
60125
}
61-
62-
std::shared_ptr<Schema> schema;
63-
if (snapshot->schema_id) {
64-
const auto& schemas = table_.schemas();
65-
if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) {
66-
schema = it->second;
126+
context_.snapshot = std::move(snapshot);
127+
128+
if (!context_.projected_schema) {
129+
std::shared_ptr<Schema> schema;
130+
if (snapshot->schema_id) {
131+
const auto& schemas = table_.schemas();
132+
if (const auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) {
133+
schema = it->second;
134+
} else {
135+
return InvalidArgument("Schema {} in snapshot {} is not found",
136+
*snapshot->schema_id, snapshot->snapshot_id);
137+
}
67138
} else {
68-
return InvalidArgument("Schema {} in snapshot {} is not found",
69-
*snapshot->schema_id, snapshot->snapshot_id);
139+
schema = table_.schema();
70140
}
71-
} else {
72-
schema = table_.schema();
73-
}
74141

75-
std::vector<SchemaField> projected_fields;
76-
projected_fields.reserve(column_names_.size());
77-
for (const auto& column_name : column_names_) {
78-
auto field_opt = schema->GetFieldByName(column_name);
79-
if (!field_opt) {
80-
return InvalidArgument("Column {} not found in schema", column_name);
142+
// TODO(gty404): collect touched columns from filter expression
143+
std::vector<SchemaField> projected_fields;
144+
projected_fields.reserve(column_names_.size());
145+
for (const auto& column_name : column_names_) {
146+
// TODO(gty404): support case-insensitive column names
147+
auto field_opt = schema->GetFieldByName(column_name);
148+
if (!field_opt) {
149+
return InvalidArgument("Column {} not found in schema", column_name);
150+
}
151+
projected_fields.emplace_back(field_opt.value().get());
81152
}
82-
projected_fields.emplace_back(field_opt.value().get());
153+
154+
context_.projected_schema =
155+
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
83156
}
84157

85-
auto projected_schema =
86-
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
87-
TableScan::ScanContext context{.snapshot = std::move(snapshot),
88-
.projected_schema = std::move(projected_schema),
89-
.filter = std::move(filter_)};
90-
return std::make_unique<TableScan>(std::move(context), table_.io());
158+
return std::make_unique<DataScan>(std::move(context_), table_.io());
91159
}
92160

93-
TableScan::TableScan(ScanContext context, std::shared_ptr<FileIO> file_io)
161+
TableScan::TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
94162
: context_(std::move(context)), file_io_(std::move(file_io)) {}
95163

96-
Result<std::vector<std::shared_ptr<FileScanTask>>> TableScan::PlanFiles() const {
164+
const std::shared_ptr<Snapshot>& TableScan::snapshot() const { return context_.snapshot; }
165+
166+
const std::shared_ptr<Schema>& TableScan::projection() const {
167+
return context_.projected_schema;
168+
}
169+
170+
const TableScanContext& TableScan::context() const { return context_; }
171+
172+
const std::shared_ptr<FileIO>& TableScan::io() const { return file_io_; }
173+
174+
DataScan::DataScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
175+
: TableScan(std::move(context), std::move(file_io)) {}
176+
177+
Result<std::vector<std::shared_ptr<FileScanTask>>> DataScan::PlanFiles() const {
97178
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader,
98179
CreateManifestListReader(context_.snapshot->manifest_list));
99180
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
100181

101-
std::vector<std::shared_ptr<FileScanTask>> tasks;
182+
std::vector<std::unique_ptr<ManifestEntry>> data_entries;
183+
std::vector<std::unique_ptr<ManifestEntry>> positional_delete_entries;
102184
for (const auto& manifest_file : manifest_files) {
103185
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
104186
CreateManifestReader(manifest_file->manifest_path));
105187
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
106188

107-
for (const auto& manifest : manifests) {
108-
const auto& data_file = manifest->data_file;
109-
tasks.emplace_back(
110-
std::make_shared<FileScanTask>(data_file, 0, data_file->file_size_in_bytes));
189+
// TODO(gty404): filter manifests using partition spec and filter expression
190+
191+
for (auto& manifest_entry : manifests) {
192+
const auto& data_file = manifest_entry->data_file;
193+
switch (data_file->content) {
194+
case DataFile::Content::kData:
195+
data_entries.push_back(std::move(manifest_entry));
196+
break;
197+
case DataFile::Content::kPositionDeletes:
198+
// TODO(gty404): check if the sequence number is greater than or equal to the
199+
// minimum sequence number of all manifest entries
200+
positional_delete_entries.push_back(std::move(manifest_entry));
201+
break;
202+
case DataFile::Content::kEqualityDeletes:
203+
return NotSupported("Equality deletes are not supported in data scan");
204+
}
111205
}
112206
}
207+
208+
// TODO(gty404): build residual expression from filter
209+
std::shared_ptr<Expression> residual;
210+
211+
std::vector<std::shared_ptr<FileScanTask>> tasks;
212+
for (const auto& data_entry : data_entries) {
213+
auto matched_deletes = GetMatchedDeletes(*data_entry, positional_delete_entries);
214+
const auto& data_file = data_entry->data_file;
215+
tasks.emplace_back(std::make_shared<FileScanTask>(
216+
data_file, matched_deletes, 0, data_file->file_size_in_bytes, residual));
217+
}
113218
return tasks;
114219
}
115220

221+
std::vector<std::shared_ptr<DataFile>> DataScan::GetMatchedDeletes(
222+
const ManifestEntry& data_entry,
223+
const std::vector<std::unique_ptr<ManifestEntry>>& positional_delete_entries) const {
224+
auto data_sequence_number =
225+
data_entry.sequence_number.value_or(Snapshot::kInitialSequenceNumber);
226+
std::vector<const ManifestEntry*> relevant_entries;
227+
// TODO(gty404): consider using a more efficient data structure
228+
for (const auto& delete_entry : positional_delete_entries) {
229+
const int64_t delete_sequence_number =
230+
delete_entry->sequence_number.value_or(Snapshot::kInitialSequenceNumber);
231+
if (delete_sequence_number >= data_sequence_number) {
232+
relevant_entries.push_back(delete_entry.get());
233+
}
234+
}
235+
236+
std::vector<std::shared_ptr<DataFile>> matched_deletes;
237+
if (relevant_entries.empty()) {
238+
return matched_deletes;
239+
}
240+
241+
matched_deletes.reserve(relevant_entries.size());
242+
for (const auto& delete_entry : relevant_entries) {
243+
// TODO(gty404): check if the delete entry contains the data entry's file path
244+
matched_deletes.emplace_back(delete_entry->data_file);
245+
}
246+
return matched_deletes;
247+
}
248+
116249
} // namespace iceberg

0 commit comments

Comments
 (0)