Skip to content

Commit ac5981a

Browse files
committed
feat(binder): implement AvroValueAdapter and RecordBinder for Avro to Iceberg conversion
1 parent 30abf31 commit ac5981a

File tree

3 files changed

+149
-29
lines changed

3 files changed

+149
-29
lines changed

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,22 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
131131
// Handle map represented as an array of key-value records
132132
Map<Object, Object> recordMap = new HashMap<>();
133133
Schema kvSchema = sourceSchema.getElementType();
134+
135+
if (kvSchema.getType() == Schema.Type.UNION) {
136+
kvSchema = kvSchema.getTypes().stream()
137+
.filter(s -> s.getType() == Schema.Type.RECORD)
138+
.findFirst()
139+
.orElseThrow(() -> new IllegalStateException(
140+
"Map element UNION schema does not contain a RECORD type: " + sourceSchema.getElementType()));
141+
}
142+
134143
Schema.Field keyField = kvSchema.getFields().get(0);
135144
Schema.Field valueField = kvSchema.getFields().get(1);
136145

137146
for (Object element : arrayValue) {
147+
if (element == null) {
148+
continue;
149+
}
138150
GenericRecord record = (GenericRecord) element;
139151
Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType());
140152
Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType());

core/src/main/java/kafka/automq/table/binder/RecordBinder.java

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema,
6969
this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter);
7070
}
7171

72+
73+
public org.apache.iceberg.Schema getIcebergSchema() {
74+
return icebergSchema;
75+
}
76+
7277
/**
7378
* Creates a new immutable Record view of the given Avro record.
7479
* Each call returns a separate instance with its own data reference.
@@ -82,13 +87,23 @@ public Record bind(GenericRecord avroRecord) {
8287
}
8388

8489
private void initializeFieldMappings(Schema avroSchema) {
90+
Schema recordSchema = avroSchema;
91+
92+
if (recordSchema.getType() == Schema.Type.UNION) {
93+
recordSchema = recordSchema.getTypes().stream()
94+
.filter(s -> s.getType() == Schema.Type.RECORD)
95+
.findFirst()
96+
.orElseThrow(() -> new IllegalArgumentException("UNION schema does not contain a RECORD type: " + avroSchema));
97+
}
98+
8599
for (int icebergPos = 0; icebergPos < icebergSchema.columns().size(); icebergPos++) {
86100
Types.NestedField icebergField = icebergSchema.columns().get(icebergPos);
87101
String fieldName = icebergField.name();
88102

89-
Schema.Field avroField = avroSchema.getField(fieldName);
103+
Schema.Field avroField = recordSchema.getField(fieldName);
90104
if (avroField != null) {
91105
fieldMappings[icebergPos] = createOptimizedMapping(
106+
avroField.name(),
92107
avroField.pos(),
93108
icebergField.type(),
94109
avroField.schema()
@@ -99,19 +114,14 @@ private void initializeFieldMappings(Schema avroSchema) {
99114
}
100115
}
101116

102-
private FieldMapping createOptimizedMapping(int avroPosition, Type icebergType, Schema avroType) {
103-
FieldMapping mapping = new FieldMapping();
104-
mapping.avroPosition = avroPosition;
105-
mapping.icebergType = icebergType;
106-
mapping.typeId = icebergType.typeId();
107-
mapping.avroSchema = avroType;
108-
117+
private FieldMapping createOptimizedMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
118+
org.apache.iceberg.Schema nestedSchema = null;
119+
String nestedSchemaId = null;
109120
if (icebergType.isStructType()) {
110-
mapping.nestedSchema = icebergType.asStructType().asSchema();
111-
mapping.nestedSchemaId = icebergType.toString();
121+
nestedSchema = icebergType.asStructType().asSchema();
122+
nestedSchemaId = icebergType.toString();
112123
}
113-
114-
return mapping;
124+
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
115125
}
116126

117127

@@ -122,12 +132,12 @@ private Map<String, RecordBinder> precomputeNestedStructBinders(TypeAdapter<Sche
122132
Map<String, RecordBinder> binders = new HashMap<>();
123133

124134
for (FieldMapping mapping : fieldMappings) {
125-
if (mapping != null && mapping.typeId == Type.TypeID.STRUCT) {
126-
String structId = mapping.nestedSchemaId;
135+
if (mapping != null && mapping.typeId() == Type.TypeID.STRUCT) {
136+
String structId = mapping.nestedSchemaId();
127137
if (!binders.containsKey(structId)) {
128138
RecordBinder nestedBinder = new RecordBinder(
129-
mapping.nestedSchema,
130-
mapping.avroSchema,
139+
mapping.nestedSchema(),
140+
mapping.avroSchema(),
131141
typeAdapter
132142
);
133143
binders.put(structId, nestedBinder);
@@ -170,18 +180,18 @@ public Object get(int pos) {
170180
}
171181

172182
FieldMapping mapping = fieldMappings[pos];
173-
if (mapping == null) {
183+
if (mapping == null || !avroRecord.hasField(mapping.avroKey())) {
174184
return null;
175185
}
176186

177-
Object avroValue = avroRecord.get(mapping.avroPosition);
187+
Object avroValue = avroRecord.get(mapping.avroPosition());
178188
if (avroValue == null) {
179189
return null;
180190
}
181191

182192
// Handle STRUCT type
183-
if (mapping.typeId == Type.TypeID.STRUCT) {
184-
String structId = mapping.nestedSchemaId;
193+
if (mapping.typeId() == Type.TypeID.STRUCT) {
194+
String structId = mapping.nestedSchemaId();
185195
RecordBinder nestedBinder = nestedStructBinders.get(structId);
186196
if (nestedBinder == null) {
187197
throw new IllegalStateException("Nested binder not found for struct: " + structId);
@@ -190,7 +200,7 @@ public Object get(int pos) {
190200
}
191201

192202
// Delegate conversion of all other types to the adapter
193-
return typeAdapter.convert(avroValue, mapping.avroSchema, mapping.icebergType);
203+
return typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
194204
}
195205

196206
@Override
@@ -235,13 +245,50 @@ public <T> void set(int pos, T value) {
235245

236246
// Field mapping structure
237247
private static class FieldMapping {
238-
int avroPosition;
239-
Type icebergType;
240-
Type.TypeID typeId;
241-
Schema avroSchema;
242-
243-
// For struct types
244-
org.apache.iceberg.Schema nestedSchema;
245-
String nestedSchemaId;
248+
private final int avroPosition;
249+
private final String avroKey;
250+
private final Type icebergType;
251+
private final Type.TypeID typeId;
252+
private final Schema avroSchema;
253+
private final org.apache.iceberg.Schema nestedSchema;
254+
private final String nestedSchemaId;
255+
256+
FieldMapping(int avroPosition, String avroKey, Type icebergType, Type.TypeID typeId, Schema avroSchema, org.apache.iceberg.Schema nestedSchema, String nestedSchemaId) {
257+
this.avroPosition = avroPosition;
258+
this.avroKey = avroKey;
259+
this.icebergType = icebergType;
260+
this.typeId = typeId;
261+
this.avroSchema = avroSchema;
262+
this.nestedSchema = nestedSchema;
263+
this.nestedSchemaId = nestedSchemaId;
264+
}
265+
266+
public int avroPosition() {
267+
return avroPosition;
268+
}
269+
270+
public String avroKey() {
271+
return avroKey;
272+
}
273+
274+
public Type icebergType() {
275+
return icebergType;
276+
}
277+
278+
public Type.TypeID typeId() {
279+
return typeId;
280+
}
281+
282+
public Schema avroSchema() {
283+
return avroSchema;
284+
}
285+
286+
public org.apache.iceberg.Schema nestedSchema() {
287+
return nestedSchema;
288+
}
289+
290+
public String nestedSchemaId() {
291+
return nestedSchemaId;
292+
}
246293
}
247294
}

core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,4 +984,65 @@ public void testUnionFieldConversion() {
984984
// Send the record to the table
985985
testSendRecord(icebergSchema, icebergRecord);
986986
}
987+
<<<<<<< Updated upstream
988+
=======
989+
990+
@Test
991+
public void testBindWithNestedOptionalRecord() {
992+
// Schema representing a record with an optional nested record field, similar to Debezium envelopes.
993+
String avroSchemaJson = "{\n" +
994+
" \"type\": \"record\",\n" +
995+
" \"name\": \"Envelope\",\n" +
996+
" \"namespace\": \"inventory.inventory.customers\",\n" +
997+
" \"fields\": [\n" +
998+
" {\n" +
999+
" \"name\": \"before\",\n" +
1000+
" \"type\": [\n" +
1001+
" \"null\",\n" +
1002+
" {\n" +
1003+
" \"type\": \"record\",\n" +
1004+
" \"name\": \"Value\",\n" +
1005+
" \"fields\": [\n" +
1006+
" { \"name\": \"id\", \"type\": \"int\" },\n" +
1007+
" { \"name\": \"first_name\", \"type\": \"string\" }\n" +
1008+
" ]\n" +
1009+
" }\n" +
1010+
" ],\n" +
1011+
" \"default\": null\n" +
1012+
" }\n" +
1013+
" ]\n" +
1014+
"}";
1015+
1016+
Schema avroSchema = new Schema.Parser().parse(avroSchemaJson);
1017+
1018+
// Corresponding Iceberg Schema
1019+
org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
1020+
1021+
// This binder will recursively create a nested binder for the 'before' field.
1022+
// The nested binder will receive a UNION schema, which is what our fix addresses.
1023+
RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema);
1024+
1025+
// --- Test Case 1: Nested record is present ---
1026+
Schema valueSchema = avroSchema.getField("before").schema().getTypes().get(1);
1027+
GenericRecord valueRecord = new GenericData.Record(valueSchema);
1028+
valueRecord.put("id", 101);
1029+
valueRecord.put("first_name", "John");
1030+
1031+
GenericRecord envelopeRecord = new GenericData.Record(avroSchema);
1032+
envelopeRecord.put("before", valueRecord);
1033+
1034+
Record boundRecord = recordBinder.bind(envelopeRecord);
1035+
Record nestedBoundRecord = (Record) boundRecord.getField("before");
1036+
1037+
assertEquals(101, nestedBoundRecord.getField("id"));
1038+
assertEquals("John", nestedBoundRecord.getField("first_name"));
1039+
1040+
// --- Test Case 2: Nested record is null ---
1041+
GenericRecord envelopeRecordWithNull = new GenericData.Record(avroSchema);
1042+
envelopeRecordWithNull.put("before", null);
1043+
1044+
Record boundRecordWithNull = recordBinder.bind(envelopeRecordWithNull);
1045+
assertNull(boundRecordWithNull.getField("before"));
1046+
}
1047+
>>>>>>> Stashed changes
9871048
}

0 commit comments

Comments
 (0)