Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(ICEBERG_SOURCES
sort_order.cc
statistics_file.cc
table_metadata.cc
table_scan.cc
transform.cc
transform_function.cc
type.cc
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class ErrorKind {
kCommitStateUnknown,
kDecompressError,
kInvalidArgument,
kInvalidData,
kInvalidExpression,
kInvalidSchema,
kIOError,
Expand Down Expand Up @@ -77,6 +78,7 @@ DEFINE_ERROR_FUNCTION(AlreadyExists)
DEFINE_ERROR_FUNCTION(CommitStateUnknown)
DEFINE_ERROR_FUNCTION(DecompressError)
DEFINE_ERROR_FUNCTION(InvalidArgument)
DEFINE_ERROR_FUNCTION(InvalidData)
DEFINE_ERROR_FUNCTION(InvalidExpression)
DEFINE_ERROR_FUNCTION(InvalidSchema)
DEFINE_ERROR_FUNCTION(IOError)
Expand Down
11 changes: 5 additions & 6 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,20 @@ class ICEBERG_EXPORT Table {
/// \return a vector of history entries
virtual const std::vector<std::shared_ptr<HistoryEntry>>& history() const = 0;

/// \brief Create a new table scan for this table
/// \brief Create a new table scan builder for this table
///
/// Once a table scan is created, it can be refined to project columns and filter data.
virtual std::unique_ptr<TableScan> NewScan() const = 0;
/// Once a table scan builder is created, it can be refined to project columns and
/// filter data.
virtual std::unique_ptr<TableScanBuilder> NewScan() const = 0;

/// \brief Create a new append API to add files to this table and commit
virtual std::shared_ptr<AppendFiles> NewAppend() = 0;

/// \brief Create a new transaction API to commit multiple table operations at once
virtual std::unique_ptr<Transaction> NewTransaction() = 0;

/// TODO(wgtmac): design of FileIO is not finalized yet. We intend to use an
/// IO-less design in the core library.
// /// \brief Returns a FileIO to read and write table data and metadata files
// virtual std::shared_ptr<FileIO> io() const = 0;
virtual std::shared_ptr<FileIO> io() const = 0;

/// \brief Returns a LocationProvider to provide locations for new data files
virtual std::unique_ptr<LocationProvider> location_provider() const = 0;
Expand Down
125 changes: 125 additions & 0 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table_scan.h"

#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/manifest_reader.h"
#include "iceberg/schema.h"
#include "iceberg/schema_field.h"
#include "iceberg/snapshot.h"
#include "iceberg/table.h"
#include "iceberg/util/macros.h"

namespace iceberg {

TableScanBuilder::TableScanBuilder(const Table& table) : table_(table) {}

TableScanBuilder& TableScanBuilder::WithColumnNames(
const std::vector<std::string>& column_names) {
column_names_ = column_names;
return *this;
}

TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
snapshot_id_ = snapshot_id;
return *this;
}

TableScanBuilder& TableScanBuilder::WithFilter(
const std::shared_ptr<Expression>& filter) {
filter_ = filter;
return *this;
}

Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
std::shared_ptr<Snapshot> snapshot;
if (snapshot_id_) {
ICEBERG_ASSIGN_OR_RAISE(snapshot, table_.snapshot(*snapshot_id_));
} else {
snapshot = table_.current_snapshot();
}

std::shared_ptr<Schema> schema;
if (snapshot->schema_id) {
const auto& schemas = table_.schemas();
if (auto it = schemas.find(*snapshot->schema_id); it != schemas.end()) {
schema = it->second;
} else {
return InvalidData("Schema {} in snapshot {} is not found", *snapshot->schema_id,
snapshot->snapshot_id);
}
} else {
schema = table_.schema();
}

std::vector<int32_t> field_ids;
field_ids.reserve(column_names_.size());
for (const auto& column_name : column_names_) {
auto field_opt = schema->GetFieldByName(column_name);
if (!field_opt) {
return InvalidArgument("Column {} not found in schema", column_name);
}
field_ids.emplace_back(field_opt.value().get().field_id());
}

TableScan::ScanContext context{.snapshot = std::move(snapshot),
.schema = std::move(schema),
.field_ids = std::move(field_ids),
.filter = std::move(filter_)};
return std::make_unique<TableScan>(std::move(context), table_.io());
}

TableScan::TableScan(ScanContext context, std::shared_ptr<FileIO> file_io)
: context_(std::move(context)), file_io_(std::move(file_io)) {}

Result<std::vector<std::shared_ptr<FileScanTask>>> TableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list_reader,
CreateManifestListReader(context_.snapshot->manifest_list));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());

std::vector<std::shared_ptr<FileScanTask>> tasks;
for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(auto manifest_reader,
CreateManifestReader(manifest_file->manifest_path));
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());

for (const auto& manifest : manifests) {
const auto& data_file = manifest->data_file;
tasks.emplace_back(std::make_shared<FileScanTask>(
data_file.file_path, 0, data_file.file_size_in_bytes, data_file.record_count,
data_file.content, data_file.file_format, context_.schema, context_.field_ids,
context_.filter));
}
}
return tasks;
}

Result<std::unique_ptr<ManifestListReader>> TableScan::CreateManifestListReader(
const std::string& file_path) const {
return NotImplemented("manifest list reader");
}

Result<std::unique_ptr<ManifestReader>> TableScan::CreateManifestReader(
const std::string& file_path) const {
return NotImplemented("manifest reader");
}

} // namespace iceberg
116 changes: 116 additions & 0 deletions src/iceberg/table_scan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <string>
#include <vector>

#include "iceberg/manifest_entry.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Builder class for creating TableScan instances.
class ICEBERG_EXPORT TableScanBuilder {
public:
/// \brief Constructs a TableScanBuilder for the given table.
/// \param table Reference to the table to scan.
explicit TableScanBuilder(const Table& table);

/// \brief Sets the snapshot ID to scan.
/// \param snapshot_id The ID of the snapshot.
/// \return Reference to the builder.
TableScanBuilder& WithSnapshotId(int64_t snapshot_id);

/// \brief Selects columns to include in the scan.
/// Defaults to none which means select all columns
/// \param column_names A list of column names.
/// \return Reference to the builder.
TableScanBuilder& WithColumnNames(const std::vector<std::string>& column_names);

/// \brief Applies a filter expression to the scan.
/// \param filter Filter expression to use.
/// \return Reference to the builder.
TableScanBuilder& WithFilter(const std::shared_ptr<Expression>& filter);

/// \brief Builds and returns a TableScan instance.
/// \return A Result containing the TableScan or an error.
Result<std::unique_ptr<TableScan>> Build();

private:
const Table& table_;
std::vector<std::string> column_names_;
std::optional<int64_t> snapshot_id_;
std::shared_ptr<Expression> filter_;
};

/// \brief Represents a configured scan operation on a table.
class ICEBERG_EXPORT TableScan {
public:
/// \brief Scan context holding snapshot and scan-specific metadata.
struct ScanContext {
std::shared_ptr<Snapshot> snapshot; ///< Snapshot to scan.
std::shared_ptr<Schema> schema; ///< Table schema.
std::vector<int32_t> field_ids; ///< Field IDs of selected columns.
std::shared_ptr<Expression> filter; ///< Filter expression to apply.
};

/// \brief Constructs a TableScan with the given context and file I/O.
/// \param context Scan context including snapshot, schema, and filter.
/// \param file_io File I/O instance for reading manifests and data files.
TableScan(ScanContext context, std::shared_ptr<FileIO> file_io);

/// \brief Plans the scan tasks by resolving manifests and data files.
///
/// Returns a list of file scan tasks if successful.
/// \return A Result containing scan tasks or an error.
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const;

private:
/// \brief Creates a reader for the manifest list.
/// \param file_path Path to the manifest list file.
/// \return A Result containing the reader or an error.
Result<std::unique_ptr<ManifestListReader>> CreateManifestListReader(
const std::string& file_path) const;

/// \brief Creates a reader for a manifest file.
/// \param file_path Path to the manifest file.
/// \return A Result containing the reader or an error.
Result<std::unique_ptr<ManifestReader>> CreateManifestReader(
const std::string& file_path) const;

ScanContext context_;
std::shared_ptr<FileIO> file_io_;
};

/// \brief Represents a task to scan a portion of a data file.
struct ICEBERG_EXPORT FileScanTask {
std::string file_path; ///< Path to the data file.
uint64_t start; ///< Start byte offset.
uint64_t length; ///< Length in bytes to scan.
std::optional<uint64_t> record_count; ///< Optional number of records.
DataFile::Content file_content; ///< Type of file content.
FileFormatType file_format; ///< Format of the data file.
std::shared_ptr<Schema> schema; ///< Table schema.
std::vector<int32_t> field_ids; ///< Field IDs to project.
std::shared_ptr<Expression> filter; ///< Filter expression to apply.
};

} // namespace iceberg
10 changes: 9 additions & 1 deletion src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class LocationProvider;
class SortField;
class SortOrder;
class Table;
class FileIO;
class Transaction;
class Transform;
class TransformFunction;
Expand All @@ -109,6 +110,12 @@ class NameMapping;
enum class SnapshotRefType;
enum class TransformType;

class Expression;

struct FileScanTask;
class TableScan;
class TableScanBuilder;

/// ----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
/// ----------------------------------------------------------------------------
Expand All @@ -120,11 +127,12 @@ class MetadataUpdate;
class UpdateRequirement;

class AppendFiles;
class TableScan;

struct DataFile;
struct ManifestEntry;
struct ManifestFile;
class ManifestReader;
struct ManifestList;
class ManifestListReader;

} // namespace iceberg
Loading