Skip to content

Commit 4974ced

Browse files
authored
feat(protobuf): enhance Protobuf data handling with LogicalMap support and enhance test coverage (#3020) (#3025)
1 parent 711cee7 commit 4974ced

File tree

10 files changed

+2558
-1280
lines changed

10 files changed

+2558
-1280
lines changed

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 150 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import kafka.automq.table.metric.FieldMetric;
2323

2424
import org.apache.avro.Schema;
25+
import org.apache.avro.SchemaBuilder;
2526
import org.apache.avro.generic.GenericRecord;
2627
import org.apache.iceberg.avro.AvroSchemaUtil;
2728
import org.apache.iceberg.data.Record;
2829
import org.apache.iceberg.types.Type;
2930
import org.apache.iceberg.types.Types;
3031

3132
import java.nio.ByteBuffer;
33+
import java.util.ArrayList;
3234
import java.util.HashMap;
3335
import java.util.IdentityHashMap;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.concurrent.atomic.AtomicLong;
3739

40+
import static org.apache.avro.Schema.Type.ARRAY;
3841
import static org.apache.avro.Schema.Type.NULL;
3942

4043
/**
@@ -124,12 +127,8 @@ private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.
124127
Schema recordSchema = avroSchema;
125128
FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()];
126129

127-
if (recordSchema.getType() == Schema.Type.UNION) {
128-
recordSchema = recordSchema.getTypes().stream()
129-
.filter(s -> s.getType() == Schema.Type.RECORD)
130-
.findFirst()
131-
.orElseThrow(() -> new IllegalArgumentException("UNION schema does not contain a RECORD type: " + avroSchema));
132-
}
130+
// Unwrap UNION if it contains only one non-NULL type
131+
recordSchema = resolveUnionElement(recordSchema);
133132

134133
for (int icebergPos = 0; icebergPos < icebergSchema.columns().size(); icebergPos++) {
135134
Types.NestedField icebergField = icebergSchema.columns().get(icebergPos);
@@ -162,17 +161,29 @@ private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, T
162161
}
163162

164163
private Schema resolveUnionElement(Schema schema) {
165-
Schema resolved = schema;
166-
if (schema.getType() == Schema.Type.UNION) {
167-
resolved = null;
168-
for (Schema unionMember : schema.getTypes()) {
169-
if (unionMember.getType() != NULL) {
170-
resolved = unionMember;
171-
break;
172-
}
164+
if (schema.getType() != Schema.Type.UNION) {
165+
return schema;
166+
}
167+
168+
// Collect all non-NULL types
169+
List<Schema> nonNullTypes = new ArrayList<>();
170+
for (Schema s : schema.getTypes()) {
171+
if (s.getType() != NULL) {
172+
nonNullTypes.add(s);
173173
}
174174
}
175-
return resolved;
175+
176+
if (nonNullTypes.isEmpty()) {
177+
throw new IllegalArgumentException("UNION schema contains only NULL type: " + schema);
178+
} else if (nonNullTypes.size() == 1) {
179+
// Only unwrap UNION if it contains exactly one non-NULL type (optional union)
180+
return nonNullTypes.get(0);
181+
} else {
182+
// Multiple non-NULL types: non-optional union not supported
183+
throw new UnsupportedOperationException(
184+
"Non-optional UNION with multiple non-NULL types is not supported. " +
185+
"Found " + nonNullTypes.size() + " non-NULL types in UNION: " + schema);
186+
}
176187
}
177188

178189

@@ -184,53 +195,135 @@ private Map<Schema, RecordBinder> precomputeBindersMap(TypeAdapter<Schema> typeA
184195

185196
for (FieldMapping mapping : fieldMappings) {
186197
if (mapping != null) {
187-
Type type = mapping.icebergType();
188-
if (type.isPrimitiveType()) {
189-
} else if (type.isStructType()) {
190-
org.apache.iceberg.Schema schema = type.asStructType().asSchema();
191-
RecordBinder structBinder = new RecordBinder(
192-
schema,
193-
mapping.avroSchema(),
194-
typeAdapter,
195-
batchFieldCount
196-
);
197-
binders.put(mapping.avroSchema(), structBinder);
198-
} else if (type.isListType()) {
199-
Types.ListType listType = type.asListType();
200-
Type elementType = listType.elementType();
201-
if (elementType.isStructType()) {
202-
org.apache.iceberg.Schema schema = elementType.asStructType().asSchema();
203-
RecordBinder elementBinder = new RecordBinder(
204-
schema,
205-
mapping.avroSchema().getElementType(),
206-
typeAdapter,
207-
batchFieldCount
208-
);
209-
binders.put(mapping.avroSchema().getElementType(), elementBinder);
210-
}
211-
} else if (type.isMapType()) {
212-
Types.MapType mapType = type.asMapType();
213-
Type keyType = mapType.keyType();
214-
Type valueType = mapType.valueType();
215-
if (keyType.isStructType()) {
216-
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
217-
}
218-
if (valueType.isStructType()) {
219-
org.apache.iceberg.Schema schema = valueType.asStructType().asSchema();
220-
RecordBinder valueBinder = new RecordBinder(
221-
schema,
222-
mapping.avroSchema().getValueType(),
223-
typeAdapter,
224-
batchFieldCount
225-
);
226-
binders.put(mapping.avroSchema().getValueType(), valueBinder);
227-
}
228-
}
198+
precomputeBindersForType(mapping.icebergType(), mapping.avroSchema(), binders, typeAdapter);
229199
}
230200
}
231201
return binders;
232202
}
233203

204+
/**
205+
* Recursively precomputes binders for a given Iceberg type and its corresponding Avro schema.
206+
*/
207+
private void precomputeBindersForType(Type icebergType, Schema avroSchema,
208+
Map<Schema, RecordBinder> binders,
209+
TypeAdapter<Schema> typeAdapter) {
210+
if (icebergType.isPrimitiveType()) {
211+
return; // No binders needed for primitive types
212+
}
213+
214+
if (icebergType.isStructType() && !avroSchema.isUnion()) {
215+
createStructBinder(icebergType.asStructType(), avroSchema, binders, typeAdapter);
216+
} else if (icebergType.isStructType() && avroSchema.isUnion()) {
217+
createUnionStructBinders(icebergType.asStructType(), avroSchema, binders, typeAdapter);
218+
} else if (icebergType.isListType()) {
219+
createListBinder(icebergType.asListType(), avroSchema, binders, typeAdapter);
220+
} else if (icebergType.isMapType()) {
221+
createMapBinder(icebergType.asMapType(), avroSchema, binders, typeAdapter);
222+
}
223+
}
224+
225+
/**
226+
* Creates binders for STRUCT types represented as Avro UNIONs.
227+
*/
228+
private void createUnionStructBinders(Types.StructType structType, Schema avroSchema,
229+
Map<Schema, RecordBinder> binders,
230+
TypeAdapter<Schema> typeAdapter) {
231+
org.apache.iceberg.Schema schema = structType.asSchema();
232+
SchemaBuilder.FieldAssembler<Schema> schemaBuilder = SchemaBuilder.record(avroSchema.getName()).fields()
233+
.name("tag").type().intType().noDefault();
234+
int tag = 0;
235+
for (Schema unionMember : avroSchema.getTypes()) {
236+
if (unionMember.getType() != NULL) {
237+
schemaBuilder.name("field" + tag).type(unionMember).noDefault();
238+
tag++;
239+
}
240+
}
241+
RecordBinder structBinder = new RecordBinder(schema, schemaBuilder.endRecord(), typeAdapter, batchFieldCount);
242+
binders.put(avroSchema, structBinder);
243+
}
244+
245+
/**
246+
* Creates a binder for a STRUCT type field.
247+
*/
248+
private void createStructBinder(Types.StructType structType, Schema avroSchema,
249+
Map<Schema, RecordBinder> binders,
250+
TypeAdapter<Schema> typeAdapter) {
251+
org.apache.iceberg.Schema schema = structType.asSchema();
252+
RecordBinder structBinder = new RecordBinder(schema, avroSchema, typeAdapter, batchFieldCount);
253+
binders.put(avroSchema, structBinder);
254+
}
255+
256+
/**
257+
* Creates binders for LIST type elements (if they are STRUCT types).
258+
*/
259+
private void createListBinder(Types.ListType listType, Schema avroSchema,
260+
Map<Schema, RecordBinder> binders,
261+
TypeAdapter<Schema> typeAdapter) {
262+
Type elementType = listType.elementType();
263+
if (elementType.isStructType()) {
264+
Schema elementAvroSchema = avroSchema.getElementType();
265+
createStructBinder(elementType.asStructType(), elementAvroSchema, binders, typeAdapter);
266+
}
267+
}
268+
269+
/**
270+
* Creates binders for MAP type keys and values (if they are STRUCT types).
271+
* Handles two Avro representations: ARRAY of key-value records, or native MAP.
272+
*/
273+
private void createMapBinder(Types.MapType mapType, Schema avroSchema,
274+
Map<Schema, RecordBinder> binders,
275+
TypeAdapter<Schema> typeAdapter) {
276+
Type keyType = mapType.keyType();
277+
Type valueType = mapType.valueType();
278+
279+
if (ARRAY.equals(avroSchema.getType())) {
280+
// Avro represents MAP as ARRAY of records with "key" and "value" fields
281+
createMapAsArrayBinder(keyType, valueType, avroSchema, binders, typeAdapter);
282+
} else {
283+
// Avro represents MAP as native MAP type
284+
createMapAsMapBinder(keyType, valueType, avroSchema, binders, typeAdapter);
285+
}
286+
}
287+
288+
/**
289+
* Handles MAP represented as Avro ARRAY of {key, value} records.
290+
*/
291+
private void createMapAsArrayBinder(Type keyType, Type valueType, Schema avroSchema,
292+
Map<Schema, RecordBinder> binders,
293+
TypeAdapter<Schema> typeAdapter) {
294+
Schema elementSchema = avroSchema.getElementType();
295+
296+
// Process key if it's a STRUCT
297+
if (keyType.isStructType()) {
298+
Schema keyAvroSchema = elementSchema.getField("key").schema();
299+
createStructBinder(keyType.asStructType(), keyAvroSchema, binders, typeAdapter);
300+
}
301+
302+
// Process value if it's a STRUCT
303+
if (valueType.isStructType()) {
304+
Schema valueAvroSchema = elementSchema.getField("value").schema();
305+
createStructBinder(valueType.asStructType(), valueAvroSchema, binders, typeAdapter);
306+
}
307+
}
308+
309+
/**
310+
* Handles MAP represented as Avro native MAP type.
311+
*/
312+
private void createMapAsMapBinder(Type keyType, Type valueType, Schema avroSchema,
313+
Map<Schema, RecordBinder> binders,
314+
TypeAdapter<Schema> typeAdapter) {
315+
// Struct keys in native MAP are not supported by Avro
316+
if (keyType.isStructType()) {
317+
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
318+
}
319+
320+
// Process value if it's a STRUCT
321+
if (valueType.isStructType()) {
322+
Schema valueAvroSchema = avroSchema.getValueType();
323+
createStructBinder(valueType.asStructType(), valueAvroSchema, binders, typeAdapter);
324+
}
325+
}
326+
234327
private static class AvroRecordView implements Record {
235328
private final GenericRecord avroRecord;
236329
private final org.apache.iceberg.Schema icebergSchema;
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.process.convert;
20+
21+
import com.google.protobuf.Descriptors;
22+
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.protobuf.ProtobufData;
25+
import org.apache.iceberg.avro.CodecSetup;
26+
27+
import java.util.Arrays;
28+
29+
/**
30+
* ProtobufData extension that annotates protobuf map fields with Iceberg's LogicalMap logical type so that
31+
* downstream Avro{@literal >}Iceberg conversion keeps them as MAP instead of generic {@literal ARRAY<record<key,value>>}.
32+
*/
33+
public class LogicalMapProtobufData extends ProtobufData {
34+
private static final LogicalMapProtobufData INSTANCE = new LogicalMapProtobufData();
35+
private static final Schema NULL = Schema.create(Schema.Type.NULL);
36+
37+
public static LogicalMapProtobufData get() {
38+
return INSTANCE;
39+
}
40+
41+
@Override
42+
public Schema getSchema(Descriptors.FieldDescriptor f) {
43+
Schema schema = super.getSchema(f);
44+
if (f.isMapField()) {
45+
Schema nonNull = resolveNonNull(schema);
46+
// protobuf maps are materialized as ARRAY<entry{key,value}> in Avro
47+
if (nonNull != null && nonNull.getType() == Schema.Type.ARRAY) {
48+
// set logicalType property; LogicalTypes is registered in CodecSetup
49+
CodecSetup.getLogicalMap().addToSchema(nonNull);
50+
}
51+
} else if (f.isOptional() && !f.isRepeated() && f.getContainingOneof() == null
52+
&& schema.getType() != Schema.Type.UNION) {
53+
// Proto3 optional scalars/messages: wrap as union(type, null) so the protobuf default (typically non-null)
54+
// remains valid (Avro default must match the first branch).
55+
schema = Schema.createUnion(Arrays.asList(schema, NULL));
56+
} else if (f.getContainingOneof() != null && !f.isRepeated() && schema.getType() != Schema.Type.UNION) {
57+
// oneof fields: wrap as union(type, null) so that non-set fields can be represented as null
58+
schema = Schema.createUnion(Arrays.asList(schema, NULL));
59+
}
60+
return schema;
61+
}
62+
63+
private Schema resolveNonNull(Schema schema) {
64+
if (schema == null) {
65+
return null;
66+
}
67+
if (schema.getType() == Schema.Type.UNION) {
68+
for (Schema member : schema.getTypes()) {
69+
if (member.getType() != Schema.Type.NULL) {
70+
return member;
71+
}
72+
}
73+
return null;
74+
}
75+
return schema;
76+
}
77+
}

0 commit comments

Comments
 (0)