Skip to content

Commit b0fd1ea

Browse files
committed
feat: add schema projection support
1 parent 1c4c047 commit b0fd1ea

File tree

5 files changed

+643
-1
lines changed

5 files changed

+643
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ set(ICEBERG_SOURCES
3030
schema.cc
3131
schema_field.cc
3232
schema_internal.cc
33+
schema_util.cc
3334
snapshot.cc
3435
sort_field.cc
3536
sort_order.cc

src/iceberg/schema_util.cc

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/schema_util.h"
21+
22+
#include <format>
23+
#include <map>
24+
#include <string_view>
25+
#include <unordered_map>
26+
27+
#include "iceberg/metadata_columns.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/util/checked_cast.h"
30+
#include "iceberg/util/formatter_internal.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
namespace {
36+
37+
Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_type) {
38+
if (expected_type.is_nested()) {
39+
// Nested type requires identical type ids.
40+
if (source_type.type_id() != expected_type.type_id()) {
41+
return NotSupported("Cannot read {} from {}", expected_type, source_type);
42+
}
43+
return {};
44+
}
45+
46+
// Short cut for same primitive type.
47+
if (expected_type == source_type) {
48+
return {};
49+
}
50+
51+
switch (expected_type.type_id()) {
52+
case TypeId::kLong: {
53+
if (source_type.type_id() == TypeId::kInt) {
54+
return {};
55+
}
56+
} break;
57+
case TypeId::kDouble: {
58+
if (source_type.type_id() == TypeId::kFloat) {
59+
return {};
60+
}
61+
} break;
62+
case TypeId::kDecimal: {
63+
if (source_type.type_id() == TypeId::kDecimal) {
64+
const auto& expected_decimal =
65+
internal::checked_cast<const DecimalType&>(expected_type);
66+
const auto& source_decimal =
67+
internal::checked_cast<const DecimalType&>(source_type);
68+
if (expected_decimal.precision() >= source_decimal.precision() &&
69+
expected_decimal.scale() == source_decimal.scale()) {
70+
return {};
71+
}
72+
}
73+
} break;
74+
default:
75+
break;
76+
}
77+
return NotSupported("Cannot read {} from {}", expected_type, source_type);
78+
}
79+
80+
// Fix `from` field of `FieldProjection` to use pruned field index.
81+
void PruneFieldProjection(FieldProjection& field_projection) {
82+
std::map<size_t, size_t> local_index_to_pruned_index;
83+
for (const auto& child_projection : field_projection.children) {
84+
if (child_projection.kind == FieldProjection::Kind::kProjected) {
85+
local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0);
86+
}
87+
}
88+
for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) {
89+
value = pruned_index++;
90+
}
91+
for (auto& child_projection : field_projection.children) {
92+
if (child_projection.kind == FieldProjection::Kind::kProjected) {
93+
child_projection.from =
94+
local_index_to_pruned_index.at(std::get<1>(child_projection.from));
95+
}
96+
}
97+
}
98+
99+
Result<FieldProjection> ProjectNested(const Type& expected_type, const Type& source_type,
100+
bool prune_source) {
101+
if (!expected_type.is_nested()) {
102+
return InvalidSchema("Expected a nested type, but got {}", expected_type);
103+
}
104+
if (expected_type.type_id() != source_type.type_id()) {
105+
return InvalidSchema("Expected {}, but got {}", expected_type, source_type);
106+
}
107+
108+
const auto& expected_fields =
109+
internal::checked_cast<const NestedType&>(expected_type).fields();
110+
const auto& source_fields =
111+
internal::checked_cast<const NestedType&>(source_type).fields();
112+
113+
// Build a map from field id to source field info including its local offset in
114+
// the current nesting level.
115+
struct SourceFieldInfo {
116+
size_t local_index;
117+
const SchemaField* field;
118+
};
119+
std::unordered_map<int32_t, SourceFieldInfo> source_field_map;
120+
source_field_map.reserve(source_fields.size());
121+
for (size_t i = 0; i < source_fields.size(); ++i) {
122+
const auto& field = source_fields[i];
123+
if (const auto [iter, inserted] = source_field_map.emplace(
124+
std::piecewise_construct, std::forward_as_tuple(field.field_id()),
125+
std::forward_as_tuple(i, &field));
126+
!inserted) [[unlikely]] {
127+
return InvalidSchema("Duplicate field id found, prev: {}, curr: {}",
128+
*iter->second.field, field);
129+
}
130+
}
131+
132+
FieldProjection result;
133+
result.children.reserve(expected_fields.size());
134+
135+
for (const auto& expected_field : expected_fields) {
136+
int32_t field_id = expected_field.field_id();
137+
FieldProjection child_projection;
138+
139+
if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) {
140+
if (expected_field.type()->is_nested()) {
141+
ICEBERG_ASSIGN_OR_RAISE(auto nested_projection,
142+
ProjectNested(*expected_field.type(),
143+
*iter->second.field->type(), prune_source));
144+
child_projection.children.emplace_back(std::move(nested_projection));
145+
} else {
146+
ICEBERG_RETURN_UNEXPECTED(
147+
ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type()));
148+
}
149+
// If `prune_source` is false, all fields will be read so the local index
150+
// is exactly the position to read data. Otherwise, the local index is computed
151+
// by pruning all non-projected fields
152+
child_projection.from = iter->second.local_index;
153+
child_projection.kind = FieldProjection::Kind::kProjected;
154+
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
155+
child_projection.kind = FieldProjection::Kind::kMetadata;
156+
} else if (expected_field.optional()) {
157+
child_projection.kind = FieldProjection::Kind::kNull;
158+
} else {
159+
// TODO(gangwu): support default value for v3 and constant value
160+
return InvalidSchema("Missing required field: {}", expected_field.ToString());
161+
}
162+
result.children.emplace_back(std::move(child_projection));
163+
}
164+
165+
if (prune_source) {
166+
PruneFieldProjection(result);
167+
}
168+
169+
return result;
170+
}
171+
172+
} // namespace
173+
174+
Result<SchemaProjection> Project(const Schema& expected_schema,
175+
const Schema& source_schema, bool prune_source) {
176+
ICEBERG_ASSIGN_OR_RAISE(auto field_projection,
177+
ProjectNested(expected_schema, source_schema, prune_source));
178+
return SchemaProjection{std::move(field_projection.children)};
179+
}
180+
181+
std::string_view ToString(FieldProjection::Kind kind) {
182+
switch (kind) {
183+
case FieldProjection::Kind::kProjected:
184+
return "projected";
185+
case FieldProjection::Kind::kMetadata:
186+
return "metadata";
187+
case FieldProjection::Kind::kConstant:
188+
return "constant";
189+
case FieldProjection::Kind::kDefault:
190+
return "default";
191+
case FieldProjection::Kind::kNull:
192+
return "null";
193+
}
194+
}
195+
196+
std::string ToString(const FieldProjection& projection) {
197+
std::string repr = std::format("FieldProjection(kind={}", projection.kind);
198+
if (projection.kind == FieldProjection::Kind::kProjected) {
199+
std::format_to(std::back_inserter(repr), ", from={}", std::get<1>(projection.from));
200+
}
201+
if (!projection.children.empty()) {
202+
std::format_to(std::back_inserter(repr), ", children={}",
203+
FormatRange(projection.children, ", ", "[", "]"));
204+
}
205+
std::format_to(std::back_inserter(repr), ")");
206+
return repr;
207+
}
208+
209+
std::string ToString(const SchemaProjection& projection) {
210+
return std::format("{}", FormatRange(projection.fields, "\n", "", ""));
211+
}
212+
213+
} // namespace iceberg

src/iceberg/schema_util.h

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <any>
23+
#include <memory>
24+
#include <string>
25+
#include <variant>
26+
#include <vector>
27+
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/result.h"
30+
#include "iceberg/type_fwd.h"
31+
32+
namespace iceberg {
33+
34+
/// \brief A field schema partner to carry projection information.
35+
struct ICEBERG_EXPORT FieldProjection {
36+
/// \brief How the field is projected.
37+
enum class Kind {
38+
/// \brief The field is projected from the source with possible conversion for
39+
/// supported
40+
/// schema evolution.
41+
kProjected,
42+
/// \brief Metadata column whose value is generated on demand.
43+
kMetadata,
44+
/// \brief The field is a constant value (e.g. partition field value)
45+
kConstant,
46+
/// \brief The field is missing in the source and should be filled with default value.
47+
kDefault,
48+
/// \brief An optional field that is not present in the source.
49+
kNull,
50+
};
51+
52+
/// \brief The field index in the source schema on the same nesting level when
53+
/// `kind` is `kProjected`.
54+
using SourceFieldIndex = size_t;
55+
/// \brief A literal value used when `kind` is `kConstant` or `kDefault`.
56+
/// TODO(gangwu): replace it with a specifically defined literal type
57+
using Literal = std::any;
58+
/// \brief A variant to indicate how to set the value of the field.
59+
using From = std::variant<std::monostate, SourceFieldIndex, Literal>;
60+
61+
/// \brief Format-specific attributes for the field.
62+
/// For example, for Parquet it might store column id and level info of the projected
63+
/// leaf field.
64+
struct ExtraAttributes {
65+
virtual ~ExtraAttributes() = default;
66+
};
67+
68+
/// \brief The kind of projection of the field it partners with.
69+
Kind kind;
70+
/// \brief The source to set the value of the field.
71+
From from;
72+
/// \brief The children of the field if it is a nested field.
73+
std::vector<FieldProjection> children;
74+
/// \brief Format-specific attributes for the field.
75+
std::shared_ptr<ExtraAttributes> extra;
76+
};
77+
78+
/// \brief A schema partner to carry projection information.
79+
struct ICEBERG_EXPORT SchemaProjection {
80+
std::vector<FieldProjection> fields;
81+
};
82+
83+
/// \brief Project the expected schema on top of the source schema.
84+
///
85+
/// \param expected_schema The expected schema.
86+
/// \param source_schema The source schema.
87+
/// \param prune_source Whether to prune the source schema. If true, the source
88+
/// schema will be pruned to match the expected schema.
89+
/// \return The projection result.
90+
ICEBERG_EXPORT Result<SchemaProjection> Project(const Schema& expected_schema,
91+
const Schema& source_schema,
92+
bool prune_source);
93+
94+
ICEBERG_EXPORT std::string_view ToString(FieldProjection::Kind kind);
95+
ICEBERG_EXPORT std::string ToString(const FieldProjection& projection);
96+
ICEBERG_EXPORT std::string ToString(const SchemaProjection& projection);
97+
98+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ target_sources(schema_test
3939
partition_spec_test.cc
4040
sort_field_test.cc
4141
sort_order_test.cc
42-
snapshot_test.cc)
42+
snapshot_test.cc
43+
schema_util_test.cc)
4344
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
4445
add_test(NAME schema_test COMMAND schema_test)
4546

0 commit comments

Comments
 (0)