Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <format>
#include <functional>

#include "iceberg/schema_internal.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -260,4 +261,148 @@ void NameToIdVisitor::Finish() {
}
}

/// \brief Visitor for pruning columns based on selected field IDs.
///
/// This visitor traverses a schema and creates a projected version containing only
/// the specified fields. When `select_full_types` is true, a field with all its
/// sub-fields are selected if its field-id has been selected; otherwise, only leaf
/// fields of selected field-ids are selected.
///
/// \note It returns an error when projection is not successful.
class PruneColumnVisitor {
public:
PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
bool select_full_types)
: selected_ids_(selected_ids), select_full_types_(select_full_types) {}

Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const {
switch (type->type_id()) {
case TypeId::kStruct:
return Visit(internal::checked_pointer_cast<StructType>(type));
case TypeId::kList:
return Visit(internal::checked_pointer_cast<ListType>(type));
case TypeId::kMap:
return Visit(internal::checked_pointer_cast<MapType>(type));
default:
return nullptr;
}
}

Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
if (selected_ids_.contains(field.field_id())) {
return (select_full_types_ || field.type()->is_primitive()) ? field.type()
: Visit(field.type());
}
return Visit(field.type());
}

static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> type) {
return {field.field_id(), std::string(field.name()), std::move(type),
field.optional(), std::string(field.doc())};
}

Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) const {
bool same_types = true;
std::vector<SchemaField> selected_fields;
for (const auto& field : type->fields()) {
ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
if (child_type) {
same_types = same_types && (child_type == field.type());
selected_fields.emplace_back(MakeField(field, std::move(child_type)));
}
}

if (selected_fields.empty()) {
return nullptr;
} else if (same_types && selected_fields.size() == type->fields().size()) {
return type;
}
return std::make_shared<StructType>(std::move(selected_fields));
}

Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) const {
const auto& elem_field = type->fields()[0];
ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
if (elem_type == nullptr) {
return nullptr;
} else if (elem_type == elem_field.type()) {
return type;
}
return std::make_shared<ListType>(MakeField(elem_field, std::move(elem_type)));
}

Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) const {
const auto& key_field = type->fields()[0];
const auto& value_field = type->fields()[1];
ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));

if (key_type == nullptr && value_type == nullptr) {
return nullptr;
} else if (value_type == value_field.type() &&
(key_type == key_field.type() || key_type == nullptr)) {
return type;
} else if (value_type == nullptr) {
return InvalidArgument("Cannot project Map without value field");
}
return std::make_shared<MapType>(
(key_type == nullptr ? key_field : MakeField(key_field, std::move(key_type))),
MakeField(value_field, std::move(value_type)));
}

private:
const std::unordered_set<int32_t>& selected_ids_;
const bool select_full_types_;
};

Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> names,
bool case_sensitive) const {
const std::string kAllColumns = "*";
if (std::ranges::find(names, kAllColumns) != names.end()) {
auto struct_type = ToStructType(*this);
return FromStructType(std::move(*struct_type), std::nullopt);
}

std::unordered_set<int32_t> selected_ids;
for (const auto& name : names) {
ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, case_sensitive));
if (result.has_value()) {
selected_ids.insert(result.value().get().field_id());
}
}

PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
ICEBERG_ASSIGN_OR_RAISE(
auto pruned_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));

if (!pruned_type) {
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
}

if (pruned_type->type_id() != TypeId::kStruct) {
return InvalidSchema("Projected type must be a struct type");
}

return FromStructType(std::move(internal::checked_cast<StructType&>(*pruned_type)),
std::nullopt);
}

Result<std::unique_ptr<Schema>> Schema::Project(
const std::unordered_set<int32_t>& field_ids) const {
PruneColumnVisitor visitor(field_ids, /*select_full_types=*/false);
ICEBERG_ASSIGN_OR_RAISE(
auto project_type, visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));

if (!project_type) {
return std::make_unique<Schema>(std::vector<SchemaField>{}, std::nullopt);
}

if (project_type->type_id() != TypeId::kStruct) {
return InvalidSchema("Projected type must be a struct type");
}

return FromStructType(std::move(internal::checked_cast<StructType&>(*project_type)),
std::nullopt);
}

} // namespace iceberg
34 changes: 27 additions & 7 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <mutex>
#include <optional>
#include <string>
#include <unordered_set>
#include <vector>

#include "iceberg/iceberg_export.h"
Expand All @@ -53,9 +54,9 @@ class ICEBERG_EXPORT Schema : public StructType {
///
/// A schema is identified by a unique ID for the purposes of schema
/// evolution.
[[nodiscard]] std::optional<int32_t> schema_id() const;
std::optional<int32_t> schema_id() const;

[[nodiscard]] std::string ToString() const override;
std::string ToString() const override;

/// \brief Find the SchemaField by field name.
///
Expand All @@ -66,18 +67,37 @@ class ICEBERG_EXPORT Schema : public StructType {
/// canonical name 'm.value.x'
/// FIXME: Currently only handles ASCII lowercase conversion; extend to support
/// non-ASCII characters (e.g., using std::towlower or ICU)
[[nodiscard]] Result<std::optional<std::reference_wrapper<const SchemaField>>>
FindFieldByName(std::string_view name, bool case_sensitive = true) const;
Result<std::optional<std::reference_wrapper<const SchemaField>>> FindFieldByName(
std::string_view name, bool case_sensitive = true) const;

/// \brief Find the SchemaField by field id.
[[nodiscard]] Result<std::optional<std::reference_wrapper<const SchemaField>>>
FindFieldById(int32_t field_id) const;
Result<std::optional<std::reference_wrapper<const SchemaField>>> FindFieldById(
int32_t field_id) const;

/// \brief Creates a projected schema from selected field names.
///
/// \param names Selected field names and nested names are dot-concatenated.
/// \param case_sensitive Whether name matching is case-sensitive (default: true).
/// \return Projected schema containing only selected fields.
/// \note If the field name of a nested type has been selected, all of its
/// sub-fields will be selected.
Result<std::unique_ptr<Schema>> Select(std::span<const std::string> names,
bool case_sensitive = true) const;

/// \brief Creates a projected schema from selected field IDs.
///
/// \param field_ids Set of field IDs to select
/// \return Projected schema containing only the specified fields.
/// \note Field ID of a nested field may not be projected unless at least
/// one of its sub-fields has been projected.
Result<std::unique_ptr<Schema>> Project(
const std::unordered_set<int32_t>& field_ids) const;

friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }

private:
/// \brief Compare two schemas for equality.
[[nodiscard]] bool Equals(const Schema& other) const;
bool Equals(const Schema& other) const;

Status InitIdToFieldMap() const;
Status InitNameToIdMap() const;
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,9 @@ Result<std::unique_ptr<Schema>> FromArrowSchema(const ArrowSchema& schema,
return FromStructType(std::move(struct_type), schema_id);
}

std::unique_ptr<StructType> ToStructType(const Schema& schema) {
std::vector<SchemaField> fields(schema.fields().begin(), schema.fields().end());
return std::make_unique<StructType>(std::move(fields));
}

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/schema_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ Result<std::unique_ptr<Schema>> FromArrowSchema(const ArrowSchema& schema,
std::unique_ptr<Schema> FromStructType(StructType&& struct_type,
std::optional<int32_t> schema_id);

std::unique_ptr<StructType> ToStructType(const Schema& schema);

} // namespace iceberg
Loading
Loading