Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(ICEBERG_SOURCES
partition_spec.cc
row/arrow_array_wrapper.cc
row/manifest_wrapper.cc
row/struct_like.cc
schema.cc
schema_field.cc
schema_internal.cc
Expand Down
30 changes: 21 additions & 9 deletions src/iceberg/expression/term.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

#include <format>

#include "iceberg/exception.h"
#include "iceberg/result.h"
#include "iceberg/row/struct_like.h"
#include "iceberg/schema.h"
#include "iceberg/transform.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -64,25 +64,37 @@ Result<std::shared_ptr<BoundReference>> NamedReference::Bind(const Schema& schem
return InvalidExpression("Cannot find field '{}' in struct: {}", field_name_,
schema.ToString());
}
return BoundReference::Make(field_opt.value().get());

int32_t field_id = field_opt.value().get().field_id();
ICEBERG_ASSIGN_OR_RAISE(auto accessor, schema.GetAccessorById(field_id));

return BoundReference::Make(field_opt.value().get(), std::move(accessor));
}

std::string NamedReference::ToString() const {
return std::format("ref(name=\"{}\")", field_name_);
}

// BoundReference implementation
Result<std::unique_ptr<BoundReference>> BoundReference::Make(SchemaField field) {
Result<std::unique_ptr<BoundReference>> BoundReference::Make(
SchemaField field, std::unique_ptr<StructLikeAccessor> accessor) {
if (auto status = field.Validate(); !status.has_value()) [[unlikely]] {
return InvalidExpression("Cannot create BoundReference with invalid field: {}",
status.error().message);
}
return std::unique_ptr<BoundReference>(new BoundReference(std::move(field)));
if (!accessor) [[unlikely]] {
return InvalidExpression("Cannot create BoundReference without accessor");
}
return std::unique_ptr<BoundReference>(
new BoundReference(std::move(field), std::move(accessor)));
}

BoundReference::BoundReference(SchemaField field) : field_(std::move(field)) {
BoundReference::BoundReference(SchemaField field,
std::unique_ptr<StructLikeAccessor> accessor)
: field_(std::move(field)), accessor_(std::move(accessor)) {
ICEBERG_DCHECK(field_.Validate().has_value(),
"Cannot create BoundReference with invalid field");
ICEBERG_DCHECK(accessor_ != nullptr, "Cannot create BoundReference without accessor");
}

BoundReference::~BoundReference() = default;
Expand All @@ -92,7 +104,7 @@ std::string BoundReference::ToString() const {
}

Result<Literal> BoundReference::Evaluate(const StructLike& data) const {
return NotImplemented("BoundReference::Evaluate(StructLike) not implemented");
return accessor_->GetLiteral(data);
}

bool BoundReference::Equals(const BoundTerm& other) const {
Expand Down Expand Up @@ -167,14 +179,14 @@ std::string BoundTransform::ToString() const {
}

Result<Literal> BoundTransform::Evaluate(const StructLike& data) const {
throw IcebergError("BoundTransform::Evaluate(StructLike) not implemented");
ICEBERG_ASSIGN_OR_RAISE(auto literal, ref_->Evaluate(data));
return transform_func_->Transform(literal);
}

bool BoundTransform::MayProduceNull() const {
// transforms must produce null for null input values
// transforms may produce null for non-null inputs when not order-preserving
// FIXME: add Transform::is_order_preserving()
return ref_->MayProduceNull(); // || !transform_->is_order_preserving();
return ref_->MayProduceNull() || !transform_->PreservesOrder();
}

std::shared_ptr<Type> BoundTransform::type() const {
Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/expression/term.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ class ICEBERG_EXPORT BoundReference
/// \brief Create a bound reference.
///
/// \param field The schema field
static Result<std::unique_ptr<BoundReference>> Make(SchemaField field);
static Result<std::unique_ptr<BoundReference>> Make(
SchemaField field, std::unique_ptr<StructLikeAccessor> accessor);

~BoundReference() override;

Expand All @@ -186,9 +187,10 @@ class ICEBERG_EXPORT BoundReference
Kind kind() const override { return Kind::kReference; }

private:
explicit BoundReference(SchemaField field);
BoundReference(SchemaField field, std::unique_ptr<StructLikeAccessor> accessor);

SchemaField field_;
std::unique_ptr<StructLikeAccessor> accessor_;
};

/// \brief An unbound transform expression.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ iceberg_sources = files(
'partition_spec.cc',
'row/arrow_array_wrapper.cc',
'row/manifest_wrapper.cc',
'row/struct_like.cc',
'schema.cc',
'schema_field.cc',
'schema_internal.cc',
Expand Down
129 changes: 129 additions & 0 deletions src/iceberg/row/struct_like.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/row/struct_like.h"

#include <utility>

#include "iceberg/result.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

StructLikeAccessor::StructLikeAccessor(std::shared_ptr<Type> type,
std::span<const size_t> position_path)
: type_(std::move(type)) {
if (position_path.size() == 1) {
accessor_ = [pos =
position_path[0]](const StructLike& struct_like) -> Result<Scalar> {
return struct_like.GetField(pos);
};
} else if (position_path.size() == 2) {
accessor_ = [pos0 = position_path[0], pos1 = position_path[1]](
const StructLike& struct_like) -> Result<Scalar> {
ICEBERG_ASSIGN_OR_RAISE(auto first_level_field, struct_like.GetField(pos0));
if (!std::holds_alternative<std::shared_ptr<StructLike>>(first_level_field)) {
return InvalidSchema("Encountered non-struct in the position path [{},{}]", pos0,
pos1);
}
return std::get<std::shared_ptr<StructLike>>(first_level_field)->GetField(pos1);
};
} else if (!position_path.empty()) {
accessor_ = [position_path](const StructLike& struct_like) -> Result<Scalar> {
std::vector<std::shared_ptr<StructLike>> backups;
const StructLike* current_struct_like = &struct_like;
for (size_t i = 0; i < position_path.size() - 1; ++i) {
ICEBERG_ASSIGN_OR_RAISE(auto field,
current_struct_like->GetField(position_path[i]));
if (!std::holds_alternative<std::shared_ptr<StructLike>>(field)) {
return InvalidSchema("Encountered non-struct in the position path [{}]",
position_path);
}
backups.push_back(std::get<std::shared_ptr<StructLike>>(field));
current_struct_like = backups.back().get();
}
return current_struct_like->GetField(position_path.back());
};
} else {
accessor_ = [](const StructLike&) -> Result<Scalar> {
return Invalid("Cannot read StructLike with empty position path");
};
}
}

Result<Literal> StructLikeAccessor::GetLiteral(const StructLike& struct_like) const {
if (!type_->is_primitive()) {
return NotSupported("Cannot get literal value for non-primitive type {}",
type_->ToString());
}

ICEBERG_ASSIGN_OR_RAISE(auto scalar, Get(struct_like));

if (std::holds_alternative<std::monostate>(scalar)) {
return Literal::Null(internal::checked_pointer_cast<PrimitiveType>(type_));
}

switch (type_->type_id()) {
case TypeId::kBoolean:
return Literal::Boolean(std::get<bool>(scalar));
case TypeId::kInt:
return Literal::Int(std::get<int32_t>(scalar));
case TypeId::kLong:
return Literal::Long(std::get<int64_t>(scalar));
case TypeId::kFloat:
return Literal::Float(std::get<float>(scalar));
case TypeId::kDouble:
return Literal::Double(std::get<double>(scalar));
case TypeId::kString:
return Literal::String(std::string(std::get<std::string_view>(scalar)));
case TypeId::kBinary: {
auto binary_data = std::get<std::string_view>(scalar);
return Literal::Binary(
std::vector<uint8_t>(binary_data.cbegin(), binary_data.cend()));
}
case TypeId::kDecimal: {
const auto& decimal_type = internal::checked_cast<const DecimalType&>(*type_);
return Literal::Decimal(std::get<Decimal>(scalar).value(), decimal_type.precision(),
decimal_type.scale());
}
case TypeId::kDate:
return Literal::Date(std::get<int32_t>(scalar));
case TypeId::kTime:
return Literal::Time(std::get<int64_t>(scalar));
case TypeId::kTimestamp:
return Literal::Timestamp(std::get<int64_t>(scalar));
case TypeId::kTimestampTz:
return Literal::TimestampTz(std::get<int64_t>(scalar));
case TypeId::kFixed: {
const auto& fixed_data = std::get<std::string_view>(scalar);
return Literal::Fixed(std::vector<uint8_t>(fixed_data.cbegin(), fixed_data.cend()));
}
case TypeId::kUuid:
// TODO(gangwu): Implement UUID type
default:
return NotSupported("Cannot convert scalar to literal of type {}",
type_->ToString());
}

std::unreachable();
}

} // namespace iceberg
29 changes: 28 additions & 1 deletion src/iceberg/row/struct_like.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
/// ManifestEntry. Note that they do not carry type information and should be
/// used in conjunction with the schema to get the type information.

#include <functional>
#include <memory>
#include <span>
#include <string_view>
#include <variant>
#include <vector>

#include "iceberg/expression/literal.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/decimal.h"
Expand Down Expand Up @@ -96,4 +98,29 @@ class ICEBERG_EXPORT MapLike {
virtual size_t size() const = 0;
};

/// \brief An accessor for a struct-like object.
class ICEBERG_EXPORT StructLikeAccessor {
public:
explicit StructLikeAccessor(std::shared_ptr<Type> type,
std::span<const size_t> position_path);

/// \brief Get the scalar value at the given position.
Result<Scalar> Get(const StructLike& struct_like) const {
return accessor_(struct_like);
}

/// \brief Get the literal value at the given position.
///
/// \return The literal value at the given position, or an error if it is
/// not a primitive type.
Result<Literal> GetLiteral(const StructLike& struct_like) const;

/// \brief Get the type of the value that this accessor is bound to.
const Type& type() const { return *type_; }

private:
std::shared_ptr<Type> type_;
std::function<Result<Scalar>(const StructLike&)> accessor_;
};

} // namespace iceberg
66 changes: 66 additions & 0 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#include <format>
#include <functional>

#include "iceberg/result.h"
#include "iceberg/row/struct_like.h"
#include "iceberg/schema_internal.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/formatter_internal.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/visit_type.h"

Expand Down Expand Up @@ -69,6 +72,48 @@ class NameToIdVisitor {
std::function<std::string(std::string_view)> quoting_func_;
};

class PositionPathVisitor {
public:
Status Visit(const PrimitiveType& type) {
if (current_field_id_ == kUnassignedFieldId) {
return InvalidSchema("Current field id is not assigned, type: {}", type.ToString());
}

if (auto ret = position_path_.try_emplace(current_field_id_, current_path_);
!ret.second) {
return InvalidSchema("Duplicate field id found: {}, prev path: {}, curr path: {}",
current_field_id_, ret.first->second, current_path_);
}

return {};
}

Status Visit(const StructType& type) {
for (size_t i = 0; i < type.fields().size(); ++i) {
const auto& field = type.fields()[i];
current_field_id_ = field.field_id();
current_path_.push_back(i);
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), this));
current_path_.pop_back();
}
return {};
}

// Non-struct types are not supported yet, but it is not an error.
Status Visit(const ListType& type) { return {}; }
Status Visit(const MapType& type) { return {}; }

std::unordered_map<int32_t, std::vector<size_t>> Finish() {
return std::move(position_path_);
}

private:
constexpr static int32_t kUnassignedFieldId = -1;
int32_t current_field_id_ = kUnassignedFieldId;
std::vector<size_t> current_path_;
std::unordered_map<int32_t, std::vector<size_t>> position_path_;
};

Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
: StructType(std::move(fields)), schema_id_(schema_id) {}

Expand Down Expand Up @@ -144,6 +189,27 @@ Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFie
return it->second;
}

Result<std::unordered_map<int32_t, std::vector<size_t>>> Schema::InitIdToPositionPath(
const Schema& self) {
PositionPathVisitor visitor;
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(self, &visitor));
return visitor.Finish();
}

Result<std::unique_ptr<StructLikeAccessor>> Schema::GetAccessorById(
int32_t field_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, id_to_position_path_.Get(*this));
if (auto it = id_to_position_path.get().find(field_id);
it != id_to_position_path.get().cend()) {
ICEBERG_ASSIGN_OR_RAISE(auto field, FindFieldById(field_id));
if (!field.has_value()) {
return NotFound("Cannot get accessor for field id: {}", field_id);
}
return std::make_unique<StructLikeAccessor>(field.value().get().type(), it->second);
}
return NotFound("Cannot get accessor for field id: {}", field_id);
}

IdToFieldVisitor::IdToFieldVisitor(
std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>& id_to_field)
: id_to_field_(id_to_field) {}
Expand Down
Loading
Loading