Skip to content

Commit ef4c124

Browse files
gty404wgtmac
andauthored
feat: basic table scan planning (#112)
Introducing basic scan table data interface --------- Co-authored-by: Gang Wu <[email protected]>
1 parent 0779a52 commit ef4c124

File tree

8 files changed

+420
-2
lines changed

8 files changed

+420
-2
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ set(ICEBERG_SOURCES
4040
statistics_file.cc
4141
table.cc
4242
table_metadata.cc
43+
table_scan.cc
4344
transform.cc
4445
transform_function.cc
4546
type.cc

src/iceberg/manifest_entry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ struct ICEBERG_EXPORT ManifestEntry {
286286
std::optional<int64_t> file_sequence_number;
287287
/// Field id: 2
288288
/// File path, partition tuple, metrics, ...
289-
DataFile data_file;
289+
std::shared_ptr<DataFile> data_file;
290290

291291
inline static const SchemaField kStatus =
292292
SchemaField::MakeRequired(0, "status", iceberg::int32());

src/iceberg/manifest_reader.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace iceberg {
3434
/// \brief Read manifest entries from a manifest file.
3535
class ICEBERG_EXPORT ManifestReader {
3636
public:
37+
virtual ~ManifestReader() = default;
3738
virtual Result<std::span<std::unique_ptr<ManifestEntry>>> Entries() const = 0;
3839

3940
private:
@@ -43,10 +44,26 @@ class ICEBERG_EXPORT ManifestReader {
4344
/// \brief Read manifest files from a manifest list file.
4445
class ICEBERG_EXPORT ManifestListReader {
4546
public:
47+
virtual ~ManifestListReader() = default;
4648
virtual Result<std::span<std::unique_ptr<ManifestFile>>> Files() const = 0;
4749

4850
private:
4951
std::unique_ptr<Reader> reader_;
5052
};
5153

54+
/// \brief Creates a reader for the manifest list.
55+
/// \param file_path Path to the manifest list file.
56+
/// \return A Result containing the reader or an error.
57+
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader(
58+
std::string_view file_path) {
59+
return NotImplemented("CreateManifestListReader is not implemented yet.");
60+
}
61+
62+
/// \brief Creates a reader for a manifest file.
63+
/// \param file_path Path to the manifest file.
64+
/// \return A Result containing the reader or an error.
65+
Result<std::unique_ptr<ManifestReader>> CreateManifestReader(std::string_view file_path) {
66+
return NotImplemented("CreateManifestReader is not implemented yet.");
67+
}
68+
5269
} // namespace iceberg

src/iceberg/table.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "iceberg/schema.h"
2424
#include "iceberg/sort_order.h"
2525
#include "iceberg/table_metadata.h"
26+
#include "iceberg/table_scan.h"
2627

2728
namespace iceberg {
2829

@@ -107,4 +108,8 @@ const std::vector<SnapshotLogEntry>& Table::history() const {
107108

108109
const std::shared_ptr<FileIO>& Table::io() const { return io_; }
109110

111+
std::unique_ptr<TableScanBuilder> Table::NewScan() const {
112+
return std::make_unique<TableScanBuilder>(metadata_, io_);
113+
}
114+
110115
} // namespace iceberg

src/iceberg/table.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ class ICEBERG_EXPORT Table {
104104
/// \return a vector of history entries
105105
const std::vector<SnapshotLogEntry>& history() const;
106106

107+
/// \brief Create a new table scan builder for this table
108+
///
109+
/// Once a table scan builder is created, it can be refined to project columns and
110+
/// filter data.
111+
virtual std::unique_ptr<TableScanBuilder> NewScan() const;
112+
107113
/// \brief Returns a FileIO to read and write table data and metadata files
108114
const std::shared_ptr<FileIO>& io() const;
109115

src/iceberg/table_scan.cc

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/table_scan.h"
21+
22+
#include <algorithm>
23+
#include <ranges>
24+
25+
#include "iceberg/manifest_entry.h"
26+
#include "iceberg/manifest_list.h"
27+
#include "iceberg/manifest_reader.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/schema_field.h"
30+
#include "iceberg/snapshot.h"
31+
#include "iceberg/table_metadata.h"
32+
#include "iceberg/util/macros.h"
33+
34+
namespace iceberg {
35+
36+
// implement FileScanTask
37+
FileScanTask::FileScanTask(std::shared_ptr<DataFile> file)
38+
: data_file_(std::move(file)) {}
39+
40+
const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return data_file_; }
41+
42+
int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; }
43+
44+
int32_t FileScanTask::files_count() const { return 1; }
45+
46+
int64_t FileScanTask::estimated_row_count() const { return data_file_->record_count; }
47+
48+
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
49+
std::shared_ptr<FileIO> file_io)
50+
: file_io_(std::move(file_io)) {
51+
context_.table_metadata = std::move(table_metadata);
52+
}
53+
54+
TableScanBuilder& TableScanBuilder::WithColumnNames(
55+
std::vector<std::string> column_names) {
56+
column_names_ = std::move(column_names);
57+
return *this;
58+
}
59+
60+
TableScanBuilder& TableScanBuilder::WithProjectedSchema(std::shared_ptr<Schema> schema) {
61+
context_.projected_schema = std::move(schema);
62+
return *this;
63+
}
64+
65+
TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
66+
snapshot_id_ = snapshot_id;
67+
return *this;
68+
}
69+
70+
TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression> filter) {
71+
context_.filter = std::move(filter);
72+
return *this;
73+
}
74+
75+
TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) {
76+
context_.case_sensitive = case_sensitive;
77+
return *this;
78+
}
79+
80+
TableScanBuilder& TableScanBuilder::WithOption(std::string property, std::string value) {
81+
context_.options[std::move(property)] = std::move(value);
82+
return *this;
83+
}
84+
85+
TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
86+
context_.limit = limit;
87+
return *this;
88+
}
89+
90+
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
91+
const auto& table_metadata = context_.table_metadata;
92+
auto snapshot_id = snapshot_id_ ? snapshot_id_ : table_metadata->current_snapshot_id;
93+
if (!snapshot_id) {
94+
return InvalidArgument("No snapshot ID specified for table {}",
95+
table_metadata->table_uuid);
96+
}
97+
auto iter = std::ranges::find_if(
98+
table_metadata->snapshots,
99+
[id = *snapshot_id](const auto& snapshot) { return snapshot->snapshot_id == id; });
100+
if (iter == table_metadata->snapshots.end()) {
101+
return NotFound("Snapshot with ID {} is not found", *snapshot_id);
102+
}
103+
context_.snapshot = *iter;
104+
105+
if (!context_.projected_schema) {
106+
const auto& snapshot = context_.snapshot;
107+
auto schema_id =
108+
snapshot->schema_id ? snapshot->schema_id : table_metadata->current_schema_id;
109+
if (!schema_id) {
110+
return InvalidArgument("No schema ID found in snapshot {} for table {}",
111+
snapshot->snapshot_id, table_metadata->table_uuid);
112+
}
113+
114+
const auto& schemas = table_metadata->schemas;
115+
const auto it = std::ranges::find_if(schemas, [id = *schema_id](const auto& schema) {
116+
return schema->schema_id() == id;
117+
});
118+
if (it == schemas.end()) {
119+
return InvalidArgument("Schema {} in snapshot {} is not found",
120+
*snapshot->schema_id, snapshot->snapshot_id);
121+
}
122+
const auto& schema = *it;
123+
124+
if (column_names_.empty()) {
125+
context_.projected_schema = schema;
126+
} else {
127+
// TODO(gty404): collect touched columns from filter expression
128+
std::vector<SchemaField> projected_fields;
129+
projected_fields.reserve(column_names_.size());
130+
for (const auto& column_name : column_names_) {
131+
// TODO(gty404): support case-insensitive column names
132+
auto field_opt = schema->GetFieldByName(column_name);
133+
if (!field_opt) {
134+
return InvalidArgument("Column {} not found in schema '{}'", column_name,
135+
*schema_id);
136+
}
137+
projected_fields.emplace_back(field_opt.value().get());
138+
}
139+
context_.projected_schema =
140+
std::make_shared<Schema>(std::move(projected_fields), schema->schema_id());
141+
}
142+
}
143+
144+
return std::make_unique<DataTableScan>(std::move(context_), file_io_);
145+
}
146+
147+
TableScan::TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
148+
: context_(std::move(context)), file_io_(std::move(file_io)) {}
149+
150+
const std::shared_ptr<Snapshot>& TableScan::snapshot() const { return context_.snapshot; }
151+
152+
const std::shared_ptr<Schema>& TableScan::projection() const {
153+
return context_.projected_schema;
154+
}
155+
156+
const TableScanContext& TableScan::context() const { return context_; }
157+
158+
const std::shared_ptr<FileIO>& TableScan::io() const { return file_io_; }
159+
160+
DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
161+
: TableScan(std::move(context), std::move(file_io)) {}
162+
163+
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
164+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader,
165+
CreateManifestListReader(context_.snapshot->manifest_list));
166+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
167+
168+
std::vector<std::shared_ptr<FileScanTask>> tasks;
169+
for (const auto& manifest_file : manifest_files) {
170+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
171+
CreateManifestReader(manifest_file->manifest_path));
172+
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
173+
174+
// TODO(gty404): filter manifests using partition spec and filter expression
175+
176+
for (auto& manifest_entry : manifests) {
177+
const auto& data_file = manifest_entry->data_file;
178+
switch (data_file->content) {
179+
case DataFile::Content::kData:
180+
tasks.emplace_back(std::make_shared<FileScanTask>(manifest_entry->data_file));
181+
break;
182+
case DataFile::Content::kPositionDeletes:
183+
case DataFile::Content::kEqualityDeletes:
184+
return NotSupported("Equality/Position deletes are not supported in data scan");
185+
}
186+
}
187+
}
188+
189+
return tasks;
190+
}
191+
192+
} // namespace iceberg

0 commit comments

Comments
 (0)