From b0fd1ea8851ce7d6f68a67b92acc9e51cd1d740e Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 20 May 2025 14:53:06 +0800 Subject: [PATCH 1/3] feat: add schema projection support --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/schema_util.cc | 213 ++++++++++++++++++++++++ src/iceberg/schema_util.h | 98 +++++++++++ test/CMakeLists.txt | 3 +- test/schema_util_test.cc | 329 +++++++++++++++++++++++++++++++++++++ 5 files changed, 643 insertions(+), 1 deletion(-) create mode 100644 src/iceberg/schema_util.cc create mode 100644 src/iceberg/schema_util.h create mode 100644 test/schema_util_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0dfb57790..d7f3fc8c0 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -30,6 +30,7 @@ set(ICEBERG_SOURCES schema.cc schema_field.cc schema_internal.cc + schema_util.cc snapshot.cc sort_field.cc sort_order.cc diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc new file mode 100644 index 000000000..0ac9ef315 --- /dev/null +++ b/src/iceberg/schema_util.cc @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/schema_util.h" + +#include +#include +#include +#include + +#include "iceberg/metadata_columns.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter_internal.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_type) { + if (expected_type.is_nested()) { + // Nested type requires identical type ids. + if (source_type.type_id() != expected_type.type_id()) { + return NotSupported("Cannot read {} from {}", expected_type, source_type); + } + return {}; + } + + // Short cut for same primitive type. + if (expected_type == source_type) { + return {}; + } + + switch (expected_type.type_id()) { + case TypeId::kLong: { + if (source_type.type_id() == TypeId::kInt) { + return {}; + } + } break; + case TypeId::kDouble: { + if (source_type.type_id() == TypeId::kFloat) { + return {}; + } + } break; + case TypeId::kDecimal: { + if (source_type.type_id() == TypeId::kDecimal) { + const auto& expected_decimal = + internal::checked_cast(expected_type); + const auto& source_decimal = + internal::checked_cast(source_type); + if (expected_decimal.precision() >= source_decimal.precision() && + expected_decimal.scale() == source_decimal.scale()) { + return {}; + } + } + } break; + default: + break; + } + return NotSupported("Cannot read {} from {}", expected_type, source_type); +} + +// Fix `from` field of `FieldProjection` to use pruned field index. +void PruneFieldProjection(FieldProjection& field_projection) { + std::map local_index_to_pruned_index; + for (const auto& child_projection : field_projection.children) { + if (child_projection.kind == FieldProjection::Kind::kProjected) { + local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0); + } + } + for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) { + value = pruned_index++; + } + for (auto& child_projection : field_projection.children) { + if (child_projection.kind == FieldProjection::Kind::kProjected) { + child_projection.from = + local_index_to_pruned_index.at(std::get<1>(child_projection.from)); + } + } +} + +Result ProjectNested(const Type& expected_type, const Type& source_type, + bool prune_source) { + if (!expected_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", expected_type); + } + if (expected_type.type_id() != source_type.type_id()) { + return InvalidSchema("Expected {}, but got {}", expected_type, source_type); + } + + const auto& expected_fields = + internal::checked_cast(expected_type).fields(); + const auto& source_fields = + internal::checked_cast(source_type).fields(); + + // Build a map from field id to source field info including its local offset in + // the current nesting level. + struct SourceFieldInfo { + size_t local_index; + const SchemaField* field; + }; + std::unordered_map source_field_map; + source_field_map.reserve(source_fields.size()); + for (size_t i = 0; i < source_fields.size(); ++i) { + const auto& field = source_fields[i]; + if (const auto [iter, inserted] = source_field_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field.field_id()), + std::forward_as_tuple(i, &field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found, prev: {}, curr: {}", + *iter->second.field, field); + } + } + + FieldProjection result; + result.children.reserve(expected_fields.size()); + + for (const auto& expected_field : expected_fields) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) { + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(auto nested_projection, + ProjectNested(*expected_field.type(), + *iter->second.field->type(), prune_source)); + child_projection.children.emplace_back(std::move(nested_projection)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type())); + } + // If `prune_source` is false, all fields will be read so the local index + // is exactly the position to read data. Otherwise, the local index is computed + // by pruning all non-projected fields + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + // TODO(gangwu): support default value for v3 and constant value + return InvalidSchema("Missing required field: {}", expected_field.ToString()); + } + result.children.emplace_back(std::move(child_projection)); + } + + if (prune_source) { + PruneFieldProjection(result); + } + + return result; +} + +} // namespace + +Result Project(const Schema& expected_schema, + const Schema& source_schema, bool prune_source) { + ICEBERG_ASSIGN_OR_RAISE(auto field_projection, + ProjectNested(expected_schema, source_schema, prune_source)); + return SchemaProjection{std::move(field_projection.children)}; +} + +std::string_view ToString(FieldProjection::Kind kind) { + switch (kind) { + case FieldProjection::Kind::kProjected: + return "projected"; + case FieldProjection::Kind::kMetadata: + return "metadata"; + case FieldProjection::Kind::kConstant: + return "constant"; + case FieldProjection::Kind::kDefault: + return "default"; + case FieldProjection::Kind::kNull: + return "null"; + } +} + +std::string ToString(const FieldProjection& projection) { + std::string repr = std::format("FieldProjection(kind={}", projection.kind); + if (projection.kind == FieldProjection::Kind::kProjected) { + std::format_to(std::back_inserter(repr), ", from={}", std::get<1>(projection.from)); + } + if (!projection.children.empty()) { + std::format_to(std::back_inserter(repr), ", children={}", + FormatRange(projection.children, ", ", "[", "]")); + } + std::format_to(std::back_inserter(repr), ")"); + return repr; +} + +std::string ToString(const SchemaProjection& projection) { + return std::format("{}", FormatRange(projection.fields, "\n", "", "")); +} + +} // namespace iceberg diff --git a/src/iceberg/schema_util.h b/src/iceberg/schema_util.h new file mode 100644 index 000000000..c12837991 --- /dev/null +++ b/src/iceberg/schema_util.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief A field schema partner to carry projection information. +struct ICEBERG_EXPORT FieldProjection { + /// \brief How the field is projected. + enum class Kind { + /// \brief The field is projected from the source with possible conversion for + /// supported + /// schema evolution. + kProjected, + /// \brief Metadata column whose value is generated on demand. + kMetadata, + /// \brief The field is a constant value (e.g. partition field value) + kConstant, + /// \brief The field is missing in the source and should be filled with default value. + kDefault, + /// \brief An optional field that is not present in the source. + kNull, + }; + + /// \brief The field index in the source schema on the same nesting level when + /// `kind` is `kProjected`. + using SourceFieldIndex = size_t; + /// \brief A literal value used when `kind` is `kConstant` or `kDefault`. + /// TODO(gangwu): replace it with a specifically defined literal type + using Literal = std::any; + /// \brief A variant to indicate how to set the value of the field. + using From = std::variant; + + /// \brief Format-specific attributes for the field. + /// For example, for Parquet it might store column id and level info of the projected + /// leaf field. + struct ExtraAttributes { + virtual ~ExtraAttributes() = default; + }; + + /// \brief The kind of projection of the field it partners with. + Kind kind; + /// \brief The source to set the value of the field. + From from; + /// \brief The children of the field if it is a nested field. + std::vector children; + /// \brief Format-specific attributes for the field. + std::shared_ptr extra; +}; + +/// \brief A schema partner to carry projection information. +struct ICEBERG_EXPORT SchemaProjection { + std::vector fields; +}; + +/// \brief Project the expected schema on top of the source schema. +/// +/// \param expected_schema The expected schema. +/// \param source_schema The source schema. +/// \param prune_source Whether to prune the source schema. If true, the source +/// schema will be pruned to match the expected schema. +/// \return The projection result. +ICEBERG_EXPORT Result Project(const Schema& expected_schema, + const Schema& source_schema, + bool prune_source); + +ICEBERG_EXPORT std::string_view ToString(FieldProjection::Kind kind); +ICEBERG_EXPORT std::string ToString(const FieldProjection& projection); +ICEBERG_EXPORT std::string ToString(const SchemaProjection& projection); + +} // namespace iceberg diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0c3776bea..c252a1748 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -39,7 +39,8 @@ target_sources(schema_test partition_spec_test.cc sort_field_test.cc sort_order_test.cc - snapshot_test.cc) + snapshot_test.cc + schema_util_test.cc) target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock) add_test(NAME schema_test COMMAND schema_test) diff --git a/test/schema_util_test.cc b/test/schema_util_test.cc new file mode 100644 index 000000000..c64a92d8c --- /dev/null +++ b/test/schema_util_test.cc @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/schema_util.h" + +#include + +#include +#include + +#include "iceberg/metadata_columns.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/type.h" +#include "matchers.h" + +namespace iceberg { + +namespace { + +// Helper function to check if a field projection is of the expected kind +void AssertProjectedField(const FieldProjection& projection, size_t expected_index) { + ASSERT_EQ(projection.kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.from), expected_index); +} + +// Helper function to create a standard source schema for testing +Schema CreateSourceSchema() { + return Schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/2, "name", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/3, "age", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/4, "data", std::make_shared(), + /*optional=*/false), + }); +} + +} // namespace + +TEST(SchemaUtilTest, ProjectIdenticalSchemas) { + Schema schema = CreateSourceSchema(); + + auto projection_result = Project(schema, schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + + for (size_t i = 0; i < projection.fields.size(); ++i) { + AssertProjectedField(projection.fields[i], i); + } +} + +TEST(SchemaUtilTest, ProjectSubsetSchema) { + Schema source_schema = CreateSourceSchema(); + Schema expected_schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/3, "age", std::make_shared(), + /*optional=*/true), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 2); +} + +TEST(SchemaUtilTest, ProjectWithPruning) { + Schema source_schema = CreateSourceSchema(); + Schema expected_schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/3, "age", std::make_shared(), + /*optional=*/true), + }); + + auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); +} + +TEST(SchemaUtilTest, ProjectMissingOptionalField) { + Schema source_schema = CreateSourceSchema(); + Schema expected_schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/2, "name", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/10, "extra", std::make_shared(), + /*optional=*/true), + }); + + auto projection_result = Project(expected_schema, source_schema, false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kNull); +} + +TEST(SchemaUtilTest, ProjectMissingRequiredField) { + Schema source_schema = CreateSourceSchema(); + Schema expected_schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/2, "name", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/10, "extra", std::make_shared(), + /*optional=*/false), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Missing required field")); +} + +TEST(SchemaUtilTest, ProjectMetadataColumn) { + Schema source_schema = CreateSourceSchema(); + Schema expected_schema({ + SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + MetadataColumns::kFilePath, + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + + AssertProjectedField(projection.fields[0], 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionIntToLong) { + Schema source_schema({SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false)}); + Schema expected_schema({SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false)}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { + Schema source_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(), + /*optional=*/true)}); + Schema expected_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(), + /*optional=*/true)}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { + Schema source_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(9, 2), + /*optional=*/true)}); + Schema expected_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(18, 2), + /*optional=*/true)}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalIncompatible) { + Schema source_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(9, 2), + /*optional=*/true)}); + Schema expected_schema( + {SchemaField(/*field_id=*/2, "value", std::make_shared(18, 3), + /*optional=*/true)}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionIncompatibleTypes) { + Schema source_schema( + {SchemaField(/*field_id=*/1, "value", std::make_shared(), + /*optional=*/true)}); + Schema expected_schema( + {SchemaField(/*field_id=*/1, "value", std::make_shared(), + /*optional=*/true)}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); +} + +TEST(SchemaUtilTest, ProjectNestedStructures) { + Schema schema( + {SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/2, "name", std::make_shared(), + /*optional=*/true), + SchemaField( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField(/*field_id=*/101, "street", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/102, "city", std::make_shared(), + /*optional=*/true), + }), + /*optional=*/true)}); + + auto projection_result = Project(schema, schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + AssertProjectedField(projection.fields[2], 2); + + ASSERT_EQ(projection.fields[2].children.size(), 1); + ASSERT_EQ(projection.fields[2].children[0].children.size(), 2); + + const auto& struct_projection = projection.fields[2].children[0]; + AssertProjectedField(struct_projection.children[0], 0); + AssertProjectedField(struct_projection.children[1], 1); +} + +TEST(SchemaUtilTest, ProjectSubsetNestedFields) { + Schema source_schema( + {SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField(/*field_id=*/2, "name", std::make_shared(), + /*optional=*/true), + SchemaField( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField(/*field_id=*/101, "street", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/102, "city", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/103, "zip", std::make_shared(), + /*optional=*/true), + }), + /*optional=*/true)}); + + Schema expected_schema( + {SchemaField(/*field_id=*/1, "id", std::make_shared(), + /*optional=*/false), + SchemaField( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField(/*field_id=*/102, "city", std::make_shared(), + /*optional=*/true), + SchemaField(/*field_id=*/101, "street", std::make_shared(), + /*optional=*/true), + }), + /*optional=*/true)}); + + auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].children.size(), 2); + + const auto& struct_projection = projection.fields[1].children[0]; + AssertProjectedField(struct_projection.children[0], 1); + AssertProjectedField(struct_projection.children[1], 0); +} + +} // namespace iceberg From 35c0f8794778786c36d0e97255371fa76a4160c0 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 22 May 2025 12:17:45 +0800 Subject: [PATCH 2/3] refine comment and add more tests --- src/iceberg/schema_util.cc | 3 +- src/iceberg/schema_util.h | 10 +- test/schema_util_test.cc | 393 +++++++++++++++++++++++++------------ 3 files changed, 273 insertions(+), 133 deletions(-) diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 0ac9ef315..7b82dc626 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -138,10 +138,9 @@ Result ProjectNested(const Type& expected_type, const Type& sou if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) { if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(auto nested_projection, + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), *iter->second.field->type(), prune_source)); - child_projection.children.emplace_back(std::move(nested_projection)); } else { ICEBERG_RETURN_UNEXPECTED( ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type())); diff --git a/src/iceberg/schema_util.h b/src/iceberg/schema_util.h index c12837991..72b6b55d7 100644 --- a/src/iceberg/schema_util.h +++ b/src/iceberg/schema_util.h @@ -36,8 +36,7 @@ struct ICEBERG_EXPORT FieldProjection { /// \brief How the field is projected. enum class Kind { /// \brief The field is projected from the source with possible conversion for - /// supported - /// schema evolution. + /// supported schema evolution. kProjected, /// \brief Metadata column whose value is generated on demand. kMetadata, @@ -84,8 +83,11 @@ struct ICEBERG_EXPORT SchemaProjection { /// /// \param expected_schema The expected schema. /// \param source_schema The source schema. -/// \param prune_source Whether to prune the source schema. If true, the source -/// schema will be pruned to match the expected schema. +/// \param prune_source Whether the source schema can be pruned to project the expected +/// schema on it. For example, literally a Parquet reader implementation is capable of +/// column pruning, so `prune_source` is set to true in this case such that the `from` +/// field in `FieldProjection` exactly reflects the position (relative to its nesting +/// level) to get the column value from the reader. /// \return The projection result. ICEBERG_EXPORT Result Project(const Schema& expected_schema, const Schema& source_schema, diff --git a/test/schema_util_test.cc b/test/schema_util_test.cc index c64a92d8c..3d8e0acb5 100644 --- a/test/schema_util_test.cc +++ b/test/schema_util_test.cc @@ -40,43 +40,89 @@ void AssertProjectedField(const FieldProjection& projection, size_t expected_ind ASSERT_EQ(std::get<1>(projection.from), expected_index); } -// Helper function to create a standard source schema for testing -Schema CreateSourceSchema() { +Schema CreateFlatSchema() { return Schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/2, "name", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/3, "age", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/4, "data", std::make_shared(), - /*optional=*/false), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/4, "data", std::make_shared()), }); } +std::shared_ptr CreateListOfStruct() { + return std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/102, "x", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/103, "y", + std::make_shared()), + }))); +} + +std::shared_ptr CreateMapWithStructValue() { + return std::make_shared( + SchemaField::MakeRequired(/*field_id=*/201, "key", std::make_shared()), + SchemaField::MakeRequired( + /*field_id=*/202, "value", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/203, "id", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/204, "name", + std::make_shared()), + }))); +} + +std::shared_ptr CreateNestedStruct() { + return std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/301, "outer_id", + std::make_shared()), + SchemaField::MakeRequired( + /*field_id=*/302, "nested", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/303, "inner_id", + std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/304, "inner_name", + std::make_shared()), + })), + }); +} + +std::shared_ptr CreateListOfList() { + return std::make_shared(SchemaField::MakeRequired( + /*field_id=*/401, "element", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/402, "element", std::make_shared())))); +} + +std::shared_ptr CreateMapOfList() { + return std::make_shared( + SchemaField::MakeRequired(/*field_id=*/501, "key", std::make_shared()), + SchemaField::MakeRequired( + /*field_id=*/502, "value", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/503, "element", std::make_shared())))); +} + } // namespace TEST(SchemaUtilTest, ProjectIdenticalSchemas) { - Schema schema = CreateSourceSchema(); + Schema schema = CreateFlatSchema(); auto projection_result = Project(schema, schema, /*prune_source=*/false); ASSERT_THAT(projection_result, IsOk()); const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 4); - for (size_t i = 0; i < projection.fields.size(); ++i) { AssertProjectedField(projection.fields[i], i); } } TEST(SchemaUtilTest, ProjectSubsetSchema) { - Schema source_schema = CreateSourceSchema(); + Schema source_schema = CreateFlatSchema(); Schema expected_schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/3, "age", std::make_shared(), - /*optional=*/true), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), }); auto projection_result = @@ -85,18 +131,15 @@ TEST(SchemaUtilTest, ProjectSubsetSchema) { const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 2); - AssertProjectedField(projection.fields[0], 0); AssertProjectedField(projection.fields[1], 2); } TEST(SchemaUtilTest, ProjectWithPruning) { - Schema source_schema = CreateSourceSchema(); + Schema source_schema = CreateFlatSchema(); Schema expected_schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/3, "age", std::make_shared(), - /*optional=*/true), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/3, "age", std::make_shared()), }); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); @@ -104,20 +147,16 @@ TEST(SchemaUtilTest, ProjectWithPruning) { const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 2); - AssertProjectedField(projection.fields[0], 0); AssertProjectedField(projection.fields[1], 1); } TEST(SchemaUtilTest, ProjectMissingOptionalField) { - Schema source_schema = CreateSourceSchema(); + Schema source_schema = CreateFlatSchema(); Schema expected_schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/2, "name", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/10, "extra", std::make_shared(), - /*optional=*/true), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/10, "extra", std::make_shared()), }); auto projection_result = Project(expected_schema, source_schema, false); @@ -132,14 +171,11 @@ TEST(SchemaUtilTest, ProjectMissingOptionalField) { } TEST(SchemaUtilTest, ProjectMissingRequiredField) { - Schema source_schema = CreateSourceSchema(); + Schema source_schema = CreateFlatSchema(); Schema expected_schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/2, "name", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/10, "extra", std::make_shared(), - /*optional=*/false), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/10, "extra", std::make_shared()), }); auto projection_result = @@ -149,10 +185,9 @@ TEST(SchemaUtilTest, ProjectMissingRequiredField) { } TEST(SchemaUtilTest, ProjectMetadataColumn) { - Schema source_schema = CreateSourceSchema(); + Schema source_schema = CreateFlatSchema(); Schema expected_schema({ - SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), MetadataColumns::kFilePath, }); @@ -162,16 +197,15 @@ TEST(SchemaUtilTest, ProjectMetadataColumn) { const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 2); - AssertProjectedField(projection.fields[0], 0); ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); } TEST(SchemaUtilTest, ProjectSchemaEvolutionIntToLong) { - Schema source_schema({SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false)}); - Schema expected_schema({SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false)}); + Schema source_schema( + {SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared())}); + Schema expected_schema( + {SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared())}); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/false); @@ -183,12 +217,10 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionIntToLong) { } TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { - Schema source_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(), - /*optional=*/true)}); - Schema expected_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(), - /*optional=*/true)}); + Schema source_schema({SchemaField::MakeOptional(/*field_id=*/2, "value", + std::make_shared())}); + Schema expected_schema({SchemaField::MakeOptional(/*field_id=*/2, "value", + std::make_shared())}); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/false); @@ -200,12 +232,10 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { } TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { - Schema source_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(9, 2), - /*optional=*/true)}); - Schema expected_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(18, 2), - /*optional=*/true)}); + Schema source_schema({SchemaField::MakeOptional(/*field_id=*/2, "value", + std::make_shared(9, 2))}); + Schema expected_schema({SchemaField::MakeOptional( + /*field_id=*/2, "value", std::make_shared(18, 2))}); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/false); @@ -217,12 +247,10 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { } TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalIncompatible) { - Schema source_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(9, 2), - /*optional=*/true)}); - Schema expected_schema( - {SchemaField(/*field_id=*/2, "value", std::make_shared(18, 3), - /*optional=*/true)}); + Schema source_schema({SchemaField::MakeOptional(/*field_id=*/2, "value", + std::make_shared(9, 2))}); + Schema expected_schema({SchemaField::MakeOptional( + /*field_id=*/2, "value", std::make_shared(18, 3))}); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/false); @@ -231,12 +259,10 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalIncompatible) { } TEST(SchemaUtilTest, ProjectSchemaEvolutionIncompatibleTypes) { - Schema source_schema( - {SchemaField(/*field_id=*/1, "value", std::make_shared(), - /*optional=*/true)}); + Schema source_schema({SchemaField::MakeOptional(/*field_id=*/1, "value", + std::make_shared())}); Schema expected_schema( - {SchemaField(/*field_id=*/1, "value", std::make_shared(), - /*optional=*/true)}); + {SchemaField::MakeOptional(/*field_id=*/1, "value", std::make_shared())}); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/false); @@ -245,69 +271,63 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionIncompatibleTypes) { } TEST(SchemaUtilTest, ProjectNestedStructures) { - Schema schema( - {SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/2, "name", std::make_shared(), - /*optional=*/true), - SchemaField( - /*field_id=*/3, "address", - std::make_shared(std::vector{ - SchemaField(/*field_id=*/101, "street", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/102, "city", std::make_shared(), - /*optional=*/true), - }), - /*optional=*/true)}); + Schema schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/101, "street", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/102, "city", + std::make_shared()), + })), + }); - auto projection_result = Project(schema, schema, /*prune_source=*/false); + auto projection_result = Project(schema, schema, /*prune_source=*/true); ASSERT_THAT(projection_result, IsOk()); const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 3); - AssertProjectedField(projection.fields[0], 0); - AssertProjectedField(projection.fields[1], 1); - AssertProjectedField(projection.fields[2], 2); - - ASSERT_EQ(projection.fields[2].children.size(), 1); - ASSERT_EQ(projection.fields[2].children[0].children.size(), 2); + // Verify top-level fields are projected correctly + AssertProjectedField(projection.fields[0], 0); // id + AssertProjectedField(projection.fields[1], 1); // name + AssertProjectedField(projection.fields[2], 2); // address - const auto& struct_projection = projection.fields[2].children[0]; - AssertProjectedField(struct_projection.children[0], 0); - AssertProjectedField(struct_projection.children[1], 1); + // Verify struct field has children correctly mapped + ASSERT_EQ(projection.fields[2].children.size(), 2); + AssertProjectedField(projection.fields[2].children[0], 0); // address.street + AssertProjectedField(projection.fields[2].children[1], 1); // address.city } TEST(SchemaUtilTest, ProjectSubsetNestedFields) { - Schema source_schema( - {SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField(/*field_id=*/2, "name", std::make_shared(), - /*optional=*/true), - SchemaField( - /*field_id=*/3, "address", - std::make_shared(std::vector{ - SchemaField(/*field_id=*/101, "street", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/102, "city", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/103, "zip", std::make_shared(), - /*optional=*/true), - }), - /*optional=*/true)}); + Schema source_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "name", std::make_shared()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/101, "street", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/102, "city", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/103, "zip", + std::make_shared()), + })), + }); - Schema expected_schema( - {SchemaField(/*field_id=*/1, "id", std::make_shared(), - /*optional=*/false), - SchemaField( - /*field_id=*/3, "address", - std::make_shared(std::vector{ - SchemaField(/*field_id=*/102, "city", std::make_shared(), - /*optional=*/true), - SchemaField(/*field_id=*/101, "street", std::make_shared(), - /*optional=*/true), - }), - /*optional=*/true)}); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional( + /*field_id=*/3, "address", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/102, "city", + std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/101, "street", + std::make_shared()), + })), + }); auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); ASSERT_THAT(projection_result, IsOk()); @@ -315,15 +335,134 @@ TEST(SchemaUtilTest, ProjectSubsetNestedFields) { const auto& projection = *projection_result; ASSERT_EQ(projection.fields.size(), 2); + // Verify top-level fields are projected correctly + AssertProjectedField(projection.fields[0], 0); // id + AssertProjectedField(projection.fields[1], 1); // address + + // Verify struct field has children correctly mapped + ASSERT_EQ(projection.fields[1].children.size(), 2); + AssertProjectedField(projection.fields[1].children[0], 1); // address.city + AssertProjectedField(projection.fields[1].children[1], 0); // address.street +} + +TEST(SchemaUtilTest, ProjectListOfStruct) { + Schema source_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "items", CreateListOfStruct()), + }); + + // Identity projection + for (const auto& prune_source : {true, false}) { + auto projection_result = Project(source_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + AssertProjectedField(projection.fields[0], 0); // id + AssertProjectedField(projection.fields[1], 1); // items + + // Verify list field has children correctly mapped + ASSERT_EQ(projection.fields[1].children.size(), 1); + const auto& struct_projection = projection.fields[1].children[0]; + ASSERT_EQ(struct_projection.children.size(), 2); + AssertProjectedField(struct_projection.children[0], 0); // item.element.x + AssertProjectedField(struct_projection.children[1], 1); // item.element.y + } + + // Subset of field selection with list of struct + { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", + std::make_shared( + std::vector{SchemaField::MakeRequired( + /*field_id=*/103, "y", std::make_shared())})))), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); // items + + // Verify list field has children correctly mapped + ASSERT_EQ(projection.fields[0].children.size(), 1); + const auto& struct_projection = projection.fields[0].children[0]; + ASSERT_EQ(struct_projection.children.size(), 1); + AssertProjectedField(struct_projection.children[0], 0); // item.element.y + } +} + +TEST(SchemaUtilTest, ProjectMapWithStructValue) { + Schema source_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "attributes", CreateMapWithStructValue()), + }); + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "attributes", CreateMapWithStructValue()), + }); + + auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); AssertProjectedField(projection.fields[0], 0); - AssertProjectedField(projection.fields[1], 1); - ASSERT_EQ(projection.fields[1].children.size(), 1); - ASSERT_EQ(projection.fields[1].children[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children.size(), 2); + AssertProjectedField(projection.fields[0].children[0], 0); // attributes.value.id + AssertProjectedField(projection.fields[0].children[1], 1); // attributes.value.name +} + +TEST(SchemaUtilTest, ProjectComplexMixedTypes) { + Schema source_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeOptional(/*field_id=*/2, "lists", CreateListOfStruct()), + SchemaField::MakeRequired(/*field_id=*/3, "mappings", CreateMapWithStructValue()), + SchemaField::MakeOptional(/*field_id=*/4, "nested", CreateNestedStruct()), + }); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", std::make_shared()), + SchemaField::MakeRequired(/*field_id=*/3, "mappings", CreateMapWithStructValue()), + }); + + // Test with prune_source = true + auto projection_result = Project(expected_schema, source_schema, /*prune_source=*/true); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + AssertProjectedField(projection.fields[0], 0); // id + AssertProjectedField(projection.fields[1], 1); // mappings + + // Test with prune_source = false + auto unpruned_result = Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(unpruned_result, IsOk()); - const auto& struct_projection = projection.fields[1].children[0]; - AssertProjectedField(struct_projection.children[0], 1); - AssertProjectedField(struct_projection.children[1], 0); + const auto& unpruned_projection = *unpruned_result; + ASSERT_EQ(unpruned_projection.fields.size(), 2); + AssertProjectedField(unpruned_projection.fields[0], 0); // id + AssertProjectedField(unpruned_projection.fields[1], 2); // mappings +} + +TEST(SchemaUtilTest, ProjectIncompatibleNestedTypes) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "data", CreateListOfStruct()), + }); + Schema incompatible_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "data", CreateMapWithStructValue()), + }); + + auto projection_result = + Project(incompatible_schema, source_schema, /*prune_source=*/false); + ASSERT_FALSE(projection_result.has_value()); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, HasErrorMessage("Expected map")); + ASSERT_THAT(projection_result, HasErrorMessage("but got list")); } } // namespace iceberg From 530918e2c97362a0dfc0c40a5abf53a30bb25e33 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 22 May 2025 14:28:03 +0800 Subject: [PATCH 3/3] add more comments --- src/iceberg/schema_util.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 7b82dc626..d8a5dcf31 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -36,7 +36,8 @@ namespace { Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_type) { if (expected_type.is_nested()) { - // Nested type requires identical type ids. + // Nested type requires identical type ids but their sub-fields are checked + // recursively and individually. if (source_type.type_id() != expected_type.type_id()) { return NotSupported("Cannot read {} from {}", expected_type, source_type); }