Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(ICEBERG_SOURCES
schema.cc
schema_field.cc
schema_internal.cc
schema_util.cc
snapshot.cc
sort_field.cc
sort_order.cc
Expand Down
213 changes: 213 additions & 0 deletions src/iceberg/schema_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/schema_util.h"

#include <format>
#include <map>
#include <string_view>
#include <unordered_map>

#include "iceberg/metadata_columns.h"
#include "iceberg/schema.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_type) {
if (expected_type.is_nested()) {
// Nested type requires identical type ids.
if (source_type.type_id() != expected_type.type_id()) {
return NotSupported("Cannot read {} from {}", expected_type, source_type);
}
return {};
}

// Short cut for same primitive type.
if (expected_type == source_type) {
return {};
}

switch (expected_type.type_id()) {
case TypeId::kLong: {
if (source_type.type_id() == TypeId::kInt) {
return {};
}
} break;
case TypeId::kDouble: {
if (source_type.type_id() == TypeId::kFloat) {
return {};
}
} break;
case TypeId::kDecimal: {
if (source_type.type_id() == TypeId::kDecimal) {
const auto& expected_decimal =
internal::checked_cast<const DecimalType&>(expected_type);
const auto& source_decimal =
internal::checked_cast<const DecimalType&>(source_type);
if (expected_decimal.precision() >= source_decimal.precision() &&
expected_decimal.scale() == source_decimal.scale()) {
return {};
}
}
} break;
default:
break;
}
return NotSupported("Cannot read {} from {}", expected_type, source_type);
}

// Fix `from` field of `FieldProjection` to use pruned field index.
void PruneFieldProjection(FieldProjection& field_projection) {
std::map<size_t, size_t> local_index_to_pruned_index;
for (const auto& child_projection : field_projection.children) {
if (child_projection.kind == FieldProjection::Kind::kProjected) {
local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0);
}
}
for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) {
value = pruned_index++;
}
for (auto& child_projection : field_projection.children) {
if (child_projection.kind == FieldProjection::Kind::kProjected) {
child_projection.from =
local_index_to_pruned_index.at(std::get<1>(child_projection.from));
}
}
}

Result<FieldProjection> ProjectNested(const Type& expected_type, const Type& source_type,
bool prune_source) {
if (!expected_type.is_nested()) {
return InvalidSchema("Expected a nested type, but got {}", expected_type);
}
if (expected_type.type_id() != source_type.type_id()) {
return InvalidSchema("Expected {}, but got {}", expected_type, source_type);
}

const auto& expected_fields =
internal::checked_cast<const NestedType&>(expected_type).fields();
const auto& source_fields =
internal::checked_cast<const NestedType&>(source_type).fields();

// Build a map from field id to source field info including its local offset in
// the current nesting level.
struct SourceFieldInfo {
size_t local_index;
const SchemaField* field;
};
std::unordered_map<int32_t, SourceFieldInfo> source_field_map;
source_field_map.reserve(source_fields.size());
for (size_t i = 0; i < source_fields.size(); ++i) {
const auto& field = source_fields[i];
if (const auto [iter, inserted] = source_field_map.emplace(
std::piecewise_construct, std::forward_as_tuple(field.field_id()),
std::forward_as_tuple(i, &field));
!inserted) [[unlikely]] {
return InvalidSchema("Duplicate field id found, prev: {}, curr: {}",
*iter->second.field, field);
}
}

FieldProjection result;
result.children.reserve(expected_fields.size());

for (const auto& expected_field : expected_fields) {
int32_t field_id = expected_field.field_id();
FieldProjection child_projection;

if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) {
if (expected_field.type()->is_nested()) {
ICEBERG_ASSIGN_OR_RAISE(auto nested_projection,
ProjectNested(*expected_field.type(),
*iter->second.field->type(), prune_source));
child_projection.children.emplace_back(std::move(nested_projection));
} else {
ICEBERG_RETURN_UNEXPECTED(
ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type()));
}
// If `prune_source` is false, all fields will be read so the local index
// is exactly the position to read data. Otherwise, the local index is computed
// by pruning all non-projected fields
child_projection.from = iter->second.local_index;
child_projection.kind = FieldProjection::Kind::kProjected;
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
child_projection.kind = FieldProjection::Kind::kMetadata;
} else if (expected_field.optional()) {
child_projection.kind = FieldProjection::Kind::kNull;
} else {
// TODO(gangwu): support default value for v3 and constant value
return InvalidSchema("Missing required field: {}", expected_field.ToString());
}
result.children.emplace_back(std::move(child_projection));
}

if (prune_source) {
PruneFieldProjection(result);
}

return result;
}

} // namespace

Result<SchemaProjection> Project(const Schema& expected_schema,
const Schema& source_schema, bool prune_source) {
ICEBERG_ASSIGN_OR_RAISE(auto field_projection,
ProjectNested(expected_schema, source_schema, prune_source));
return SchemaProjection{std::move(field_projection.children)};
}

std::string_view ToString(FieldProjection::Kind kind) {
switch (kind) {
case FieldProjection::Kind::kProjected:
return "projected";
case FieldProjection::Kind::kMetadata:
return "metadata";
case FieldProjection::Kind::kConstant:
return "constant";
case FieldProjection::Kind::kDefault:
return "default";
case FieldProjection::Kind::kNull:
return "null";
}
}

std::string ToString(const FieldProjection& projection) {
std::string repr = std::format("FieldProjection(kind={}", projection.kind);
if (projection.kind == FieldProjection::Kind::kProjected) {
std::format_to(std::back_inserter(repr), ", from={}", std::get<1>(projection.from));
}
if (!projection.children.empty()) {
std::format_to(std::back_inserter(repr), ", children={}",
FormatRange(projection.children, ", ", "[", "]"));
}
std::format_to(std::back_inserter(repr), ")");
return repr;
}

std::string ToString(const SchemaProjection& projection) {
return std::format("{}", FormatRange(projection.fields, "\n", "", ""));
}

} // namespace iceberg
98 changes: 98 additions & 0 deletions src/iceberg/schema_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <any>
#include <memory>
#include <string>
#include <variant>
#include <vector>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief A field schema partner to carry projection information.
struct ICEBERG_EXPORT FieldProjection {
/// \brief How the field is projected.
enum class Kind {
/// \brief The field is projected from the source with possible conversion for
/// supported
/// schema evolution.
kProjected,
/// \brief Metadata column whose value is generated on demand.
kMetadata,
/// \brief The field is a constant value (e.g. partition field value)
kConstant,
/// \brief The field is missing in the source and should be filled with default value.
kDefault,
/// \brief An optional field that is not present in the source.
kNull,
};

/// \brief The field index in the source schema on the same nesting level when
/// `kind` is `kProjected`.
using SourceFieldIndex = size_t;
/// \brief A literal value used when `kind` is `kConstant` or `kDefault`.
/// TODO(gangwu): replace it with a specifically defined literal type
using Literal = std::any;
/// \brief A variant to indicate how to set the value of the field.
using From = std::variant<std::monostate, SourceFieldIndex, Literal>;

/// \brief Format-specific attributes for the field.
/// For example, for Parquet it might store column id and level info of the projected
/// leaf field.
struct ExtraAttributes {
virtual ~ExtraAttributes() = default;
};

/// \brief The kind of projection of the field it partners with.
Kind kind;
/// \brief The source to set the value of the field.
From from;
/// \brief The children of the field if it is a nested field.
std::vector<FieldProjection> children;
/// \brief Format-specific attributes for the field.
std::shared_ptr<ExtraAttributes> extra;
};

/// \brief A schema partner to carry projection information.
struct ICEBERG_EXPORT SchemaProjection {
std::vector<FieldProjection> fields;
};

/// \brief Project the expected schema on top of the source schema.
///
/// \param expected_schema The expected schema.
/// \param source_schema The source schema.
/// \param prune_source Whether to prune the source schema. If true, the source
/// schema will be pruned to match the expected schema.
/// \return The projection result.
ICEBERG_EXPORT Result<SchemaProjection> Project(const Schema& expected_schema,
const Schema& source_schema,
bool prune_source);

ICEBERG_EXPORT std::string_view ToString(FieldProjection::Kind kind);
ICEBERG_EXPORT std::string ToString(const FieldProjection& projection);
ICEBERG_EXPORT std::string ToString(const SchemaProjection& projection);

} // namespace iceberg
3 changes: 2 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ target_sources(schema_test
partition_spec_test.cc
sort_field_test.cc
sort_order_test.cc
snapshot_test.cc)
snapshot_test.cc
schema_util_test.cc)
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
add_test(NAME schema_test COMMAND schema_test)

Expand Down
Loading
Loading