Skip to content

Commit e70d2ca

Browse files
authored
feat: add schema projection support (#102)
Implement a `SchemaProjection` companion data structure to work with expected `Schema` to carry projection result. This is useful to project data returned by different file format reader implementations.
1 parent cb44bdc commit e70d2ca

File tree

5 files changed

+784
-1
lines changed

5 files changed

+784
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set(ICEBERG_SOURCES
3232
schema.cc
3333
schema_field.cc
3434
schema_internal.cc
35+
schema_util.cc
3536
snapshot.cc
3637
sort_field.cc
3738
sort_order.cc

src/iceberg/schema_util.cc

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/schema_util.h"
21+
22+
#include <format>
23+
#include <map>
24+
#include <string_view>
25+
#include <unordered_map>
26+
27+
#include "iceberg/metadata_columns.h"
28+
#include "iceberg/schema.h"
29+
#include "iceberg/util/checked_cast.h"
30+
#include "iceberg/util/formatter_internal.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
namespace {
36+
37+
Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_type) {
38+
if (expected_type.is_nested()) {
39+
// Nested type requires identical type ids but their sub-fields are checked
40+
// recursively and individually.
41+
if (source_type.type_id() != expected_type.type_id()) {
42+
return NotSupported("Cannot read {} from {}", expected_type, source_type);
43+
}
44+
return {};
45+
}
46+
47+
// Short cut for same primitive type.
48+
if (expected_type == source_type) {
49+
return {};
50+
}
51+
52+
switch (expected_type.type_id()) {
53+
case TypeId::kLong: {
54+
if (source_type.type_id() == TypeId::kInt) {
55+
return {};
56+
}
57+
} break;
58+
case TypeId::kDouble: {
59+
if (source_type.type_id() == TypeId::kFloat) {
60+
return {};
61+
}
62+
} break;
63+
case TypeId::kDecimal: {
64+
if (source_type.type_id() == TypeId::kDecimal) {
65+
const auto& expected_decimal =
66+
internal::checked_cast<const DecimalType&>(expected_type);
67+
const auto& source_decimal =
68+
internal::checked_cast<const DecimalType&>(source_type);
69+
if (expected_decimal.precision() >= source_decimal.precision() &&
70+
expected_decimal.scale() == source_decimal.scale()) {
71+
return {};
72+
}
73+
}
74+
} break;
75+
default:
76+
break;
77+
}
78+
return NotSupported("Cannot read {} from {}", expected_type, source_type);
79+
}
80+
81+
// Fix `from` field of `FieldProjection` to use pruned field index.
82+
void PruneFieldProjection(FieldProjection& field_projection) {
83+
std::map<size_t, size_t> local_index_to_pruned_index;
84+
for (const auto& child_projection : field_projection.children) {
85+
if (child_projection.kind == FieldProjection::Kind::kProjected) {
86+
local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 0);
87+
}
88+
}
89+
for (size_t pruned_index = 0; auto& [_, value] : local_index_to_pruned_index) {
90+
value = pruned_index++;
91+
}
92+
for (auto& child_projection : field_projection.children) {
93+
if (child_projection.kind == FieldProjection::Kind::kProjected) {
94+
child_projection.from =
95+
local_index_to_pruned_index.at(std::get<1>(child_projection.from));
96+
}
97+
}
98+
}
99+
100+
Result<FieldProjection> ProjectNested(const Type& expected_type, const Type& source_type,
101+
bool prune_source) {
102+
if (!expected_type.is_nested()) {
103+
return InvalidSchema("Expected a nested type, but got {}", expected_type);
104+
}
105+
if (expected_type.type_id() != source_type.type_id()) {
106+
return InvalidSchema("Expected {}, but got {}", expected_type, source_type);
107+
}
108+
109+
const auto& expected_fields =
110+
internal::checked_cast<const NestedType&>(expected_type).fields();
111+
const auto& source_fields =
112+
internal::checked_cast<const NestedType&>(source_type).fields();
113+
114+
// Build a map from field id to source field info including its local offset in
115+
// the current nesting level.
116+
struct SourceFieldInfo {
117+
size_t local_index;
118+
const SchemaField* field;
119+
};
120+
std::unordered_map<int32_t, SourceFieldInfo> source_field_map;
121+
source_field_map.reserve(source_fields.size());
122+
for (size_t i = 0; i < source_fields.size(); ++i) {
123+
const auto& field = source_fields[i];
124+
if (const auto [iter, inserted] = source_field_map.emplace(
125+
std::piecewise_construct, std::forward_as_tuple(field.field_id()),
126+
std::forward_as_tuple(i, &field));
127+
!inserted) [[unlikely]] {
128+
return InvalidSchema("Duplicate field id found, prev: {}, curr: {}",
129+
*iter->second.field, field);
130+
}
131+
}
132+
133+
FieldProjection result;
134+
result.children.reserve(expected_fields.size());
135+
136+
for (const auto& expected_field : expected_fields) {
137+
int32_t field_id = expected_field.field_id();
138+
FieldProjection child_projection;
139+
140+
if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) {
141+
if (expected_field.type()->is_nested()) {
142+
ICEBERG_ASSIGN_OR_RAISE(child_projection,
143+
ProjectNested(*expected_field.type(),
144+
*iter->second.field->type(), prune_source));
145+
} else {
146+
ICEBERG_RETURN_UNEXPECTED(
147+
ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type()));
148+
}
149+
// If `prune_source` is false, all fields will be read so the local index
150+
// is exactly the position to read data. Otherwise, the local index is computed
151+
// by pruning all non-projected fields
152+
child_projection.from = iter->second.local_index;
153+
child_projection.kind = FieldProjection::Kind::kProjected;
154+
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
155+
child_projection.kind = FieldProjection::Kind::kMetadata;
156+
} else if (expected_field.optional()) {
157+
child_projection.kind = FieldProjection::Kind::kNull;
158+
} else {
159+
// TODO(gangwu): support default value for v3 and constant value
160+
return InvalidSchema("Missing required field: {}", expected_field.ToString());
161+
}
162+
result.children.emplace_back(std::move(child_projection));
163+
}
164+
165+
if (prune_source) {
166+
PruneFieldProjection(result);
167+
}
168+
169+
return result;
170+
}
171+
172+
} // namespace
173+
174+
Result<SchemaProjection> Project(const Schema& expected_schema,
175+
const Schema& source_schema, bool prune_source) {
176+
ICEBERG_ASSIGN_OR_RAISE(auto field_projection,
177+
ProjectNested(expected_schema, source_schema, prune_source));
178+
return SchemaProjection{std::move(field_projection.children)};
179+
}
180+
181+
std::string_view ToString(FieldProjection::Kind kind) {
182+
switch (kind) {
183+
case FieldProjection::Kind::kProjected:
184+
return "projected";
185+
case FieldProjection::Kind::kMetadata:
186+
return "metadata";
187+
case FieldProjection::Kind::kConstant:
188+
return "constant";
189+
case FieldProjection::Kind::kDefault:
190+
return "default";
191+
case FieldProjection::Kind::kNull:
192+
return "null";
193+
}
194+
}
195+
196+
std::string ToString(const FieldProjection& projection) {
197+
std::string repr = std::format("FieldProjection(kind={}", projection.kind);
198+
if (projection.kind == FieldProjection::Kind::kProjected) {
199+
std::format_to(std::back_inserter(repr), ", from={}", std::get<1>(projection.from));
200+
}
201+
if (!projection.children.empty()) {
202+
std::format_to(std::back_inserter(repr), ", children={}",
203+
FormatRange(projection.children, ", ", "[", "]"));
204+
}
205+
std::format_to(std::back_inserter(repr), ")");
206+
return repr;
207+
}
208+
209+
std::string ToString(const SchemaProjection& projection) {
210+
return std::format("{}", FormatRange(projection.fields, "\n", "", ""));
211+
}
212+
213+
} // namespace iceberg

src/iceberg/schema_util.h

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <any>
23+
#include <memory>
24+
#include <string>
25+
#include <variant>
26+
#include <vector>
27+
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/result.h"
30+
#include "iceberg/type_fwd.h"
31+
32+
namespace iceberg {
33+
34+
/// \brief A field schema partner to carry projection information.
35+
struct ICEBERG_EXPORT FieldProjection {
36+
/// \brief How the field is projected.
37+
enum class Kind {
38+
/// \brief The field is projected from the source with possible conversion for
39+
/// supported schema evolution.
40+
kProjected,
41+
/// \brief Metadata column whose value is generated on demand.
42+
kMetadata,
43+
/// \brief The field is a constant value (e.g. partition field value)
44+
kConstant,
45+
/// \brief The field is missing in the source and should be filled with default value.
46+
kDefault,
47+
/// \brief An optional field that is not present in the source.
48+
kNull,
49+
};
50+
51+
/// \brief The field index in the source schema on the same nesting level when
52+
/// `kind` is `kProjected`.
53+
using SourceFieldIndex = size_t;
54+
/// \brief A literal value used when `kind` is `kConstant` or `kDefault`.
55+
/// TODO(gangwu): replace it with a specifically defined literal type
56+
using Literal = std::any;
57+
/// \brief A variant to indicate how to set the value of the field.
58+
using From = std::variant<std::monostate, SourceFieldIndex, Literal>;
59+
60+
/// \brief Format-specific attributes for the field.
61+
/// For example, for Parquet it might store column id and level info of the projected
62+
/// leaf field.
63+
struct ExtraAttributes {
64+
virtual ~ExtraAttributes() = default;
65+
};
66+
67+
/// \brief The kind of projection of the field it partners with.
68+
Kind kind;
69+
/// \brief The source to set the value of the field.
70+
From from;
71+
/// \brief The children of the field if it is a nested field.
72+
std::vector<FieldProjection> children;
73+
/// \brief Format-specific attributes for the field.
74+
std::shared_ptr<ExtraAttributes> extra;
75+
};
76+
77+
/// \brief A schema partner to carry projection information.
78+
struct ICEBERG_EXPORT SchemaProjection {
79+
std::vector<FieldProjection> fields;
80+
};
81+
82+
/// \brief Project the expected schema on top of the source schema.
83+
///
84+
/// \param expected_schema The expected schema.
85+
/// \param source_schema The source schema.
86+
/// \param prune_source Whether the source schema can be pruned to project the expected
87+
/// schema on it. For example, literally a Parquet reader implementation is capable of
88+
/// column pruning, so `prune_source` is set to true in this case such that the `from`
89+
/// field in `FieldProjection` exactly reflects the position (relative to its nesting
90+
/// level) to get the column value from the reader.
91+
/// \return The projection result.
92+
ICEBERG_EXPORT Result<SchemaProjection> Project(const Schema& expected_schema,
93+
const Schema& source_schema,
94+
bool prune_source);
95+
96+
ICEBERG_EXPORT std::string_view ToString(FieldProjection::Kind kind);
97+
ICEBERG_EXPORT std::string ToString(const FieldProjection& projection);
98+
ICEBERG_EXPORT std::string ToString(const SchemaProjection& projection);
99+
100+
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ target_sources(schema_test
3939
partition_spec_test.cc
4040
sort_field_test.cc
4141
sort_order_test.cc
42-
snapshot_test.cc)
42+
snapshot_test.cc
43+
schema_util_test.cc)
4344
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
4445
add_test(NAME schema_test COMMAND schema_test)
4546

0 commit comments

Comments
 (0)