Skip to content

Commit e971cc4

Browse files
committed
feat: basic table scan planning
1 parent 08da3a9 commit e971cc4

File tree

6 files changed

+254
-9
lines changed

6 files changed

+254
-9
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(ICEBERG_SOURCES
3838
sort_order.cc
3939
statistics_file.cc
4040
table_metadata.cc
41+
table_scan.cc
4142
transform.cc
4243
transform_function.cc
4344
type.cc

src/iceberg/result.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace iceberg {
3131
enum class ErrorKind {
3232
kAlreadyExists,
3333
kCommitStateUnknown,
34+
kDataInvalid,
3435
kInvalidArgument,
3536
kInvalidExpression,
3637
kInvalidSchema,
@@ -65,14 +66,15 @@ using Status = Result<void>;
6566
/// \brief Macro to define error creation functions
6667
#define DEFINE_ERROR_FUNCTION(name) \
6768
template <typename... Args> \
68-
inline auto name(const std::format_string<Args...> fmt, Args&&... args) \
69-
-> unexpected<Error> { \
69+
inline auto name(const std::format_string<Args...> fmt, \
70+
Args&&... args) -> unexpected<Error> { \
7071
return unexpected<Error>( \
7172
{ErrorKind::k##name, std::format(fmt, std::forward<Args>(args)...)}); \
7273
}
7374

7475
DEFINE_ERROR_FUNCTION(AlreadyExists)
7576
DEFINE_ERROR_FUNCTION(CommitStateUnknown)
77+
DEFINE_ERROR_FUNCTION(DataInvalid)
7678
DEFINE_ERROR_FUNCTION(InvalidArgument)
7779
DEFINE_ERROR_FUNCTION(InvalidExpression)
7880
DEFINE_ERROR_FUNCTION(InvalidSchema)

src/iceberg/table.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,20 @@ class ICEBERG_EXPORT Table {
8888
/// \return a vector of history entries
8989
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;
9090

91-
/// \brief Create a new table scan for this table
91+
/// \brief Create a new table scan builder for this table
9292
///
93-
/// Once a table scan is created, it can be refined to project columns and filter data.
94-
virtual std::unique_ptr<TableScan> NewScan() const = 0;
93+
/// Once a table scan builder is created, it can be refined to project columns and
94+
/// filter data.
95+
virtual std::unique_ptr<TableScanBuilder> NewScan() const = 0;
9596

9697
/// \brief Create a new append API to add files to this table and commit
9798
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;
9899

99100
/// \brief Create a new transaction API to commit multiple table operations at once
100101
virtual std::unique_ptr<Transaction> NewTransaction() = 0;
101102

102-
/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
103-
/// IO-less design in the core library.
104103
// /// \brief Returns a FileIO to read and write table data and metadata files
105-
// virtual std::shared_ptr<FileIO> io() const = 0;
104+
virtual std::shared_ptr<FileIO> io() const = 0;
106105

107106
/// \brief Returns a LocationProvider to provide locations for new data files
108107
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;

src/iceberg/table_scan.cc

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/table_scan.h"
21+
22+
#include "iceberg/manifest_entry.h"
23+
#include "iceberg/manifest_list.h"
24+
#include "iceberg/manifest_reader.h"
25+
#include "iceberg/schema.h"
26+
#include "iceberg/schema_field.h"
27+
#include "iceberg/snapshot.h"
28+
#include "iceberg/table.h"
29+
#include "iceberg/util/macros.h"
30+
31+
namespace iceberg {
32+
33+
TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {}
34+
35+
TableScanBuilder& TableScanBuilder::WithColumnNames(
36+
const std::vector<std::string>& column_names) {
37+
column_names_ = column_names;
38+
return *this;
39+
}
40+
41+
TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
42+
snapshot_id_ = snapshot_id;
43+
return *this;
44+
}
45+
46+
TableScanBuilder& TableScanBuilder::WithFilter(
47+
const std::shared_ptr<Expression>& filter) {
48+
filter_ = filter;
49+
return *this;
50+
}
51+
52+
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
53+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, snapshot_id_ ? table_.snapshot(*snapshot_id_)
54+
: Result<std::shared_ptr<Snapshot>>(
55+
table_.current_snapshot()));
56+
57+
auto ResolveSchema = [&]() -> Result<std::shared_ptr<Schema>> {
58+
if (snapshot->schema_id) {
59+
const auto& schemas = table_.schemas();
60+
if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) {
61+
return it->second;
62+
}
63+
return DataInvalid("Schema {} in snapshot {} is not found", *snapshot->schema_id,
64+
snapshot->snapshot_id);
65+
}
66+
return table_.schema();
67+
};
68+
69+
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSchema());
70+
71+
std::vector<int32_t> field_ids;
72+
field_ids.reserve(column_names_.size());
73+
for (const auto& column_name : column_names_) {
74+
auto field_opt = schema->GetFieldByName(column_name);
75+
if (!field_opt) {
76+
return InvalidArgument("Column {} not found in schema", column_name);
77+
}
78+
field_ids.emplace_back(field_opt.value().get().field_id());
79+
}
80+
81+
auto context = std::make_unique<TableScan::ScanContext>(
82+
std::move(snapshot), std::move(schema), std::move(field_ids), std::move(filter_));
83+
return std::make_unique<TableScan>(std::move(context), table_.io());
84+
}
85+
86+
TableScan::TableScan(std::unique_ptr<ScanContext> context,
87+
std::shared_ptr<FileIO> file_io)
88+
: context_(std::move(context)), file_io_(std::move(file_io)) {}
89+
90+
Result<std::vector<std::unique_ptr<FileScanTask>>> TableScan::PlanFiles() const {
91+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader,
92+
CreateManifestListReader(context_->snapshot_->manifest_list));
93+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
94+
95+
std::vector<std::unique_ptr<FileScanTask>> tasks;
96+
for (const auto& manifest_file : manifest_files) {
97+
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
98+
CreateManifestReader(manifest_file->manifest_path));
99+
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
100+
101+
for (const auto& manifest : manifests) {
102+
const auto& data_file = manifest->data_file;
103+
tasks.emplace_back(std::make_unique<FileScanTask>(
104+
data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count,
105+
data_file.content, data_file.file_format, context_->schema_,
106+
context_->field_ids_, context_->filter_));
107+
}
108+
}
109+
return tasks;
110+
}
111+
112+
Result<std::unique_ptr<ManifestListReader>> TableScan::CreateManifestListReader(
113+
const std::string& file_path) const {
114+
return NotImplemented("manifest list reader");
115+
}
116+
117+
Result<std::unique_ptr<ManifestReader>> TableScan::CreateManifestReader(
118+
const std::string& file_path) const {
119+
return NotImplemented("manifest reader");
120+
}
121+
122+
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <string>
23+
#include <vector>
24+
25+
#include "iceberg/expression/expression.h"
26+
#include "iceberg/file_io.h"
27+
#include "iceberg/manifest_entry.h"
28+
#include "iceberg/type_fwd.h"
29+
30+
namespace iceberg {
31+
32+
/// \brief Builder class for creating TableScan instances.
33+
class ICEBERG_EXPORT TableScanBuilder {
34+
public:
35+
/// \brief Constructs a TableScanBuilder for the given table.
36+
/// \param table Reference to the table to scan.
37+
explicit TableScanBuilder(const Table& table);
38+
39+
/// \brief Sets the snapshot ID to scan.
40+
/// \param snapshot_id The ID of the snapshot.
41+
/// \return Reference to the builder.
42+
TableScanBuilder& WithSnapshotId(int64_t snapshot_id);
43+
44+
/// \brief Selects columns to include in the scan.
45+
/// \param column_names A list of column names.
46+
/// \return Reference to the builder.
47+
TableScanBuilder& WithColumnNames(const std::vector<std::string>& column_names);
48+
49+
/// \brief Applies a filter expression to the scan.
50+
/// \param filter Filter expression to use.
51+
/// \return Reference to the builder.
52+
TableScanBuilder& WithFilter(const std::shared_ptr<Expression>& filter);
53+
54+
/// \brief Builds and returns a TableScan instance.
55+
/// \return A Result containing the TableScan or an error.
56+
Result<std::unique_ptr<TableScan>> Build();
57+
58+
private:
59+
const Table& table_;
60+
std::vector<std::string> column_names_;
61+
std::optional<int64_t> snapshot_id_;
62+
std::shared_ptr<Expression> filter_;
63+
};
64+
65+
/// \brief Represents a configured scan operation on a table.
66+
class ICEBERG_EXPORT TableScan {
67+
public:
68+
/// \brief Scan context holding snapshot and scan-specific metadata.
69+
struct ScanContext {
70+
std::shared_ptr<Snapshot> snapshot_; ///< Snapshot to scan.
71+
std::shared_ptr<Schema> schema_; ///< Projected schema.
72+
std::vector<int32_t> field_ids_; ///< Field IDs of selected columns.
73+
std::shared_ptr<Expression> filter_; ///< Filter expression to apply.
74+
};
75+
76+
/// \brief Constructs a TableScan with the given context and file I/O.
77+
/// \param context Scan context including snapshot, schema, and filter.
78+
/// \param file_io File I/O instance for reading manifests and data files.
79+
TableScan(std::unique_ptr<ScanContext> context, std::shared_ptr<FileIO> file_io);
80+
81+
/// \brief Plans the scan tasks by resolving manifests and data files.
82+
///
83+
/// Returns a list of file scan tasks if successful.
84+
/// \return A Result containing scan tasks or an error.
85+
Result<std::vector<std::unique_ptr<FileScanTask>>> PlanFiles() const;
86+
87+
private:
88+
/// \brief Creates a reader for the manifest list.
89+
/// \param file_path Path to the manifest list file.
90+
/// \return A Result containing the reader or an error.
91+
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader(
92+
const std::string& file_path) const;
93+
94+
/// \brief Creates a reader for a manifest file.
95+
/// \param file_path Path to the manifest file.
96+
/// \return A Result containing the reader or an error.
97+
Result<std::unique_ptr<ManifestReader>> CreateManifestReader(
98+
const std::string& file_path) const;
99+
100+
std::unique_ptr<ScanContext> context_;
101+
std::shared_ptr<FileIO> file_io_;
102+
};
103+
104+
/// \brief Represents a task to scan a portion of a data file.
105+
struct ICEBERG_EXPORT FileScanTask {
106+
std::string file_path_; ///< Path to the data file.
107+
uint64_t start_; ///< Start byte offset.
108+
uint64_t length_; ///< Length in bytes to scan.
109+
std::optional<uint64_t> record_count_; ///< Optional number of records.
110+
DataFile::Content file_content_; ///< Type of file content.
111+
FileFormatType file_format_; ///< Format of the data file.
112+
std::shared_ptr<Schema> schema_; ///< Projected schema.
113+
std::vector<int32_t> field_ids_; ///< Field IDs to project.
114+
std::shared_ptr<Expression> filter_; ///< Filter expression to apply.
115+
};
116+
117+
} // namespace iceberg

src/iceberg/type_fwd.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ class LocationProvider;
9191
class SortField;
9292
class SortOrder;
9393
class Table;
94+
class TableScan;
95+
struct FileScanTask;
96+
class TableScanBuilder;
9497
class Transaction;
9598
class Transform;
9699
class TransformFunction;
@@ -119,11 +122,12 @@ class MetadataUpdate;
119122
class UpdateRequirement;
120123

121124
class AppendFiles;
122-
class TableScan;
123125

124126
struct DataFile;
125127
struct ManifestEntry;
126128
struct ManifestFile;
129+
class ManifestReader;
127130
struct ManifestList;
131+
class ManifestListReader;
128132

129133
} // namespace iceberg

0 commit comments

Comments
 (0)