Skip to content

Commit 2feea4e

Browse files
committed
feat: add field id checker to avro schema
1 parent 1c4c047 commit 2feea4e

File tree

3 files changed

+374
-0
lines changed

3 files changed

+374
-0
lines changed

src/iceberg/avro/avro_schema_util.cc

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include <avro/CustomAttributes.hh>
2727
#include <avro/LogicalType.hh>
2828
#include <avro/NodeImpl.hh>
29+
#include <avro/Schema.hh>
2930
#include <avro/Types.hh>
31+
#include <avro/ValidSchema.hh>
3032

3133
#include "iceberg/avro/avro_schema_util_internal.h"
3234
#include "iceberg/util/macros.h"
@@ -263,4 +265,116 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node)
263265
return {};
264266
}
265267

268+
namespace {
269+
270+
bool HasId(const ::avro::NodePtr& parent_node, size_t field_idx,
271+
const std::string& attr_name) {
272+
if (field_idx >= parent_node->customAttributes()) {
273+
return false;
274+
}
275+
return parent_node->customAttributesAt(field_idx).getAttribute(attr_name).has_value();
276+
}
277+
278+
} // namespace
279+
280+
Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
281+
if (!node) [[unlikely]] {
282+
return InvalidSchema("Avro node is null");
283+
}
284+
285+
switch (node->type()) {
286+
case ::avro::AVRO_RECORD:
287+
return VisitRecord(node);
288+
case ::avro::AVRO_ARRAY:
289+
return VisitArray(node);
290+
case ::avro::AVRO_MAP:
291+
return VisitMap(node);
292+
case ::avro::AVRO_UNION:
293+
return VisitUnion(node);
294+
case ::avro::AVRO_BOOL:
295+
case ::avro::AVRO_INT:
296+
case ::avro::AVRO_LONG:
297+
case ::avro::AVRO_FLOAT:
298+
case ::avro::AVRO_DOUBLE:
299+
case ::avro::AVRO_STRING:
300+
case ::avro::AVRO_BYTES:
301+
case ::avro::AVRO_FIXED:
302+
return {};
303+
case ::avro::AVRO_NULL:
304+
case ::avro::AVRO_ENUM:
305+
default:
306+
return InvalidSchema("Unsupported Avro type: {}", static_cast<int>(node->type()));
307+
}
308+
}
309+
310+
Status HasIdVisitor::VisitRecord(const ::avro::NodePtr& node) {
311+
static const std::string kFieldIdKey{kFieldIdProp};
312+
total_fields_ += node->leaves();
313+
for (size_t i = 0; i < node->leaves(); ++i) {
314+
if (HasId(node, i, kFieldIdKey)) {
315+
fields_with_id_++;
316+
}
317+
ICEBERG_RETURN_UNEXPECTED(Visit(node->leafAt(i)));
318+
}
319+
return {};
320+
}
321+
322+
Status HasIdVisitor::VisitArray(const ::avro::NodePtr& node) {
323+
if (node->leaves() != 1) [[unlikely]] {
324+
return InvalidSchema("Array type must have exactly one leaf");
325+
}
326+
327+
if (node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
328+
node->logicalType().customLogicalType() != nullptr &&
329+
node->logicalType().customLogicalType()->name() == "map") {
330+
return Visit(node->leafAt(0));
331+
}
332+
333+
total_fields_++;
334+
if (HasId(node, /*field_idx=*/0, std::string(kElementIdProp))) {
335+
fields_with_id_++;
336+
}
337+
338+
return Visit(node->leafAt(0));
339+
}
340+
341+
Status HasIdVisitor::VisitMap(const ::avro::NodePtr& node) {
342+
if (node->leaves() != 2) [[unlikely]] {
343+
return InvalidSchema("Map type must have exactly two leaves");
344+
}
345+
346+
total_fields_ += 2;
347+
if (HasId(node, /*field_idx=*/0, std::string(kKeyIdProp))) {
348+
fields_with_id_++;
349+
}
350+
if (HasId(node, /*field_idx=*/0, std::string(kValueIdProp))) {
351+
fields_with_id_++;
352+
}
353+
354+
return Visit(node->leafAt(1));
355+
}
356+
357+
Status HasIdVisitor::VisitUnion(const ::avro::NodePtr& node) {
358+
if (node->leaves() != 2) [[unlikely]] {
359+
return InvalidSchema("Union type must have exactly two branches");
360+
}
361+
362+
const auto& branch_0 = node->leafAt(0);
363+
const auto& branch_1 = node->leafAt(1);
364+
if (branch_0->type() == ::avro::AVRO_NULL) {
365+
return Visit(branch_1);
366+
}
367+
if (branch_1->type() == ::avro::AVRO_NULL) {
368+
return Visit(branch_0);
369+
}
370+
371+
return InvalidSchema("Union type must have exactly one null branch");
372+
}
373+
374+
Status HasIdVisitor::Visit(const ::avro::ValidSchema& schema) {
375+
return Visit(schema.root());
376+
}
377+
378+
Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.root()); }
379+
266380
} // namespace iceberg::avro

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
#include "iceberg/result.h"
2727
#include "iceberg/type.h"
2828

29+
namespace avro {
30+
class Schema;
31+
class ValidSchema;
32+
} // namespace avro
33+
2934
namespace iceberg::avro {
3035

3136
/// \brief A visitor that converts an Iceberg type to an Avro node.
@@ -55,4 +60,65 @@ class ToAvroNodeVisitor {
5560
std::stack<int32_t> field_ids_;
5661
};
5762

63+
/// \brief A visitor that checks the presence of field IDs in an Avro schema.
64+
class HasIdVisitor {
65+
public:
66+
HasIdVisitor() = default;
67+
68+
/// \brief Visit an Avro node to check for field IDs.
69+
/// \param node The Avro node to visit.
70+
/// \return Status indicating success or an error if unsupported Avro types are
71+
/// encountered.
72+
Status Visit(const ::avro::NodePtr& node);
73+
74+
/// \brief Visit an Avro schema to check for field IDs.
75+
/// \param schema The Avro schema to visit.
76+
/// \return Status indicating success or an error if unsupported Avro types are
77+
/// encountered.
78+
Status Visit(const ::avro::ValidSchema& schema);
79+
80+
/// \brief Visit an Avro schema to check for field IDs.
81+
/// \param schema The Avro schema to visit.
82+
/// \return Status indicating success or an error if unsupported Avro types are
83+
/// encountered.
84+
Status Visit(const ::avro::Schema& node);
85+
86+
/// \brief Check if all fields in the visited schema have field IDs.
87+
/// \return True if all fields have IDs, false otherwise.
88+
bool AllHaveIds() const {
89+
return total_fields_ == fields_with_id_ && fields_with_id_ != 0;
90+
}
91+
92+
/// \brief Check if all fields in the visited schema have field IDs.
93+
/// \return True if all fields have IDs, false otherwise.
94+
bool HasNoIds() const { return total_fields_ == 0; }
95+
96+
private:
97+
/// \brief Visit a record node to check for field IDs.
98+
/// \param node The record node to visit.
99+
/// \return Status indicating success or error.
100+
Status VisitRecord(const ::avro::NodePtr& node);
101+
102+
/// \brief Visit an array node to check for element IDs.
103+
/// \param node The array node to visit.
104+
/// \return Status indicating success or error.
105+
Status VisitArray(const ::avro::NodePtr& node);
106+
107+
/// \brief Visit a map node to check for key and value IDs.
108+
/// \param node The map node to visit.
109+
/// \return Status indicating success or error.
110+
Status VisitMap(const ::avro::NodePtr& node);
111+
112+
/// \brief Visit a union node to check for field IDs in each branch.
113+
/// \param node The union node to visit.
114+
/// \return Status indicating success or error.
115+
Status VisitUnion(const ::avro::NodePtr& node);
116+
117+
private:
118+
// Total number of fields visited.
119+
size_t total_fields_ = 0;
120+
// Number of fields with IDs.
121+
size_t fields_with_id_ = 0;
122+
};
123+
58124
} // namespace iceberg::avro

0 commit comments

Comments
 (0)