Skip to content

Commit 42c42c2

Browse files
authored
fix(binder): enhance RecordBinder and TypeAdapter to support STRUCT type conversion (#3005) (#3023)
* fix(binder): enhance RecordBinder and TypeAdapter to support STRUCT type conversion * test(binder): add tests for nested struct binding and schema instance reuse * fix(adapter): support conversion from Number to TIMESTAMPTZ in AbstractTypeAdapter * fix(adapter): update list and map conversion methods to include StructConverter
1 parent fb9fcdd commit 42c42c2

File tree

9 files changed

+661
-153
lines changed

9 files changed

+661
-153
lines changed

core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@
4848
*/
4949
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {
5050

51+
5152
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
5253
@Override
53-
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
54+
public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter) {
5455
if (sourceValue == null) {
5556
return null;
5657
}
@@ -83,9 +84,11 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
8384
case TIMESTAMP:
8485
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
8586
case LIST:
86-
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
87+
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType, structConverter);
8788
case MAP:
88-
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
89+
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType, structConverter);
90+
case STRUCT:
91+
return structConverter.convert(sourceValue, sourceSchema, targetType);
8992
default:
9093
return sourceValue;
9194
}
@@ -203,10 +206,13 @@ protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.Time
203206
Instant instant = Instant.parse(sourceValue.toString());
204207
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
205208
}
209+
if (sourceValue instanceof Number) {
210+
return DateTimeUtil.timestamptzFromMicros(((Number) sourceValue).longValue());
211+
}
206212
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
207213
}
208214

209-
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
215+
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType, StructConverter<S> structConverter);
210216

211-
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
217+
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType, StructConverter<S> structConverter);
212218
}

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
116116
}
117117

118118
@Override
119-
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
119+
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType, StructConverter<Schema> structConverter) {
120120
Schema listSchema = sourceSchema;
121121
Schema elementSchema = listSchema.getElementType();
122122

@@ -131,14 +131,14 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
131131

132132
List<Object> list = new ArrayList<>(sourceList.size());
133133
for (Object element : sourceList) {
134-
Object convert = convert(element, elementSchema, targetType.elementType());
134+
Object convert = convert(element, elementSchema, targetType.elementType(), structConverter);
135135
list.add(convert);
136136
}
137137
return list;
138138
}
139139

140140
@Override
141-
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
141+
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType, StructConverter<Schema> structConverter) {
142142
if (sourceValue instanceof GenericData.Array) {
143143
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
144144
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
@@ -161,8 +161,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
161161
continue;
162162
}
163163
GenericRecord record = (GenericRecord) element;
164-
Object key = convert(record.get(keyField.pos()), keySchema, keyType);
165-
Object value = convert(record.get(valueField.pos()), valueSchema, valueType);
164+
Object key = convert(record.get(keyField.pos()), keySchema, keyType, structConverter);
165+
Object value = convert(record.get(valueField.pos()), valueSchema, valueType, structConverter);
166166
recordMap.put(key, value);
167167
}
168168
return recordMap;
@@ -179,10 +179,32 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
179179

180180
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
181181
Object rawKey = entry.getKey();
182-
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType);
183-
Object value = convert(entry.getValue(), valueSchema, valueType);
182+
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType, structConverter);
183+
Object value = convert(entry.getValue(), valueSchema, valueType, structConverter);
184184
adaptedMap.put(key, value);
185185
}
186186
return adaptedMap;
187187
}
188+
189+
@Override
190+
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
191+
return convert(sourceValue, sourceSchema, targetType, this::convertStruct);
192+
}
193+
194+
protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) {
195+
org.apache.iceberg.Schema schema = targetType.asStructType().asSchema();
196+
org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema);
197+
for (Types.NestedField f : schema.columns()) {
198+
// Convert the value to the expected type
199+
GenericRecord record = (GenericRecord) sourceValue;
200+
Schema.Field sourceField = sourceSchema.getField(f.name());
201+
if (sourceField == null) {
202+
throw new IllegalStateException("Missing field '" + f.name()
203+
+ "' in source schema: " + sourceSchema.getFullName());
204+
}
205+
Object fieldValue = convert(record.get(f.name()), sourceField.schema(), f.type());
206+
result.setField(f.name(), fieldValue);
207+
}
208+
return result;
209+
}
188210
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.iceberg.types.Type;
23+
24+
/**
25+
* Represents the mapping between an Avro field and its corresponding Iceberg field.
26+
* This class stores the position, key, schema, and type information needed to
27+
* convert field values during record binding.
28+
*/
29+
public class FieldMapping {
30+
private final int avroPosition;
31+
private final String avroKey;
32+
private final Type icebergType;
33+
private final Schema avroSchema;
34+
35+
public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) {
36+
this.avroPosition = avroPosition;
37+
this.avroKey = avroKey;
38+
this.icebergType = icebergType;
39+
this.avroSchema = avroSchema;
40+
}
41+
42+
public int avroPosition() {
43+
return avroPosition;
44+
}
45+
46+
public String avroKey() {
47+
return avroKey;
48+
}
49+
50+
public Type icebergType() {
51+
return icebergType;
52+
}
53+
54+
public Schema avroSchema() {
55+
return avroSchema;
56+
}
57+
}

0 commit comments

Comments
 (0)