Skip to content

Commit 16aabe8

Browse files
committed
feat(binder): implement AvroValueAdapter and RecordBinder for Avro to Iceberg conversion
1 parent 59a92fa commit 16aabe8

File tree

7 files changed

+2226
-0
lines changed

7 files changed

+2226
-0
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.iceberg.types.Type;
22+
import org.apache.iceberg.types.Types;
23+
import org.apache.iceberg.util.ByteBuffers;
24+
import org.apache.iceberg.util.DateTimeUtil;
25+
import org.apache.iceberg.util.UUIDUtil;
26+
27+
import java.math.BigDecimal;
28+
import java.nio.ByteBuffer;
29+
import java.nio.charset.StandardCharsets;
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalTime;
33+
import java.time.ZoneOffset;
34+
import java.time.temporal.Temporal;
35+
import java.util.Date;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
40+
/**
41+
* Abstract implementation providing common type conversion logic from source formats
42+
* to Iceberg's internal Java type representation.
43+
* <p>
44+
* Handles dispatch logic and provides default conversion implementations for primitive types.
45+
* Subclasses implement format-specific conversion for complex types (LIST, MAP, STRUCT).
46+
*
47+
* @param <S> The type of the source schema (e.g., org.apache.avro.Schema)
48+
*/
49+
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {
50+
51+
@Override
52+
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
53+
if (sourceValue == null) {
54+
return null;
55+
}
56+
57+
switch (targetType.typeId()) {
58+
case BOOLEAN:
59+
return convertBoolean(sourceValue, sourceSchema, targetType);
60+
case INTEGER:
61+
return convertInteger(sourceValue, sourceSchema, targetType);
62+
case LONG:
63+
return convertLong(sourceValue, sourceSchema, targetType);
64+
case FLOAT:
65+
return convertFloat(sourceValue, sourceSchema, targetType);
66+
case DOUBLE:
67+
return convertDouble(sourceValue, sourceSchema, targetType);
68+
case STRING:
69+
return convertString(sourceValue, sourceSchema, targetType);
70+
case BINARY:
71+
return convertBinary(sourceValue, sourceSchema, targetType);
72+
case FIXED:
73+
return convertFixed(sourceValue, sourceSchema, targetType);
74+
case UUID:
75+
return convertUUID(sourceValue, sourceSchema, targetType);
76+
case DECIMAL:
77+
return convertDecimal(sourceValue, sourceSchema, (Types.DecimalType) targetType);
78+
case DATE:
79+
return convertDate(sourceValue, sourceSchema, targetType);
80+
case TIME:
81+
return convertTime(sourceValue, sourceSchema, targetType);
82+
case TIMESTAMP:
83+
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
84+
case LIST:
85+
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
86+
case MAP:
87+
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
88+
default:
89+
return sourceValue;
90+
}
91+
}
92+
93+
protected Object convertBoolean(Object sourceValue, S ignoredSourceSchema, Type targetType) {
94+
if (sourceValue instanceof Boolean) return sourceValue;
95+
if (sourceValue instanceof String) return Boolean.parseBoolean((String) sourceValue);
96+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
97+
}
98+
99+
protected Object convertInteger(Object sourceValue, S ignoredSourceSchema, Type targetType) {
100+
if (sourceValue instanceof Integer) return sourceValue;
101+
if (sourceValue instanceof Number) return ((Number) sourceValue).intValue();
102+
if (sourceValue instanceof String) return Integer.parseInt((String) sourceValue);
103+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
104+
}
105+
106+
protected Object convertLong(Object sourceValue, S ignoredSourceSchema, Type targetType) {
107+
if (sourceValue instanceof Long) return sourceValue;
108+
if (sourceValue instanceof Number) return ((Number) sourceValue).longValue();
109+
if (sourceValue instanceof String) return Long.parseLong((String) sourceValue);
110+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
111+
}
112+
113+
protected Object convertFloat(Object sourceValue, S ignoredSourceSchema, Type targetType) {
114+
if (sourceValue instanceof Float) return sourceValue;
115+
if (sourceValue instanceof Number) return ((Number) sourceValue).floatValue();
116+
if (sourceValue instanceof String) return Float.parseFloat((String) sourceValue);
117+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
118+
}
119+
120+
protected Object convertDouble(Object sourceValue, S ignoredSourceSchema, Type targetType) {
121+
if (sourceValue instanceof Double) return sourceValue;
122+
if (sourceValue instanceof Number) return ((Number) sourceValue).doubleValue();
123+
if (sourceValue instanceof String) return Double.parseDouble((String) sourceValue);
124+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
125+
}
126+
127+
protected Object convertString(Object sourceValue, S sourceSchema, Type targetType) {
128+
if (sourceValue instanceof String) {
129+
return sourceValue;
130+
}
131+
// Simple toString conversion - subclasses can override for more complex logic
132+
return sourceValue.toString();
133+
}
134+
135+
protected Object convertBinary(Object sourceValue, S sourceSchema, Type targetType) {
136+
if (sourceValue instanceof ByteBuffer) return ((ByteBuffer) sourceValue).duplicate();
137+
if (sourceValue instanceof byte[]) return ByteBuffer.wrap((byte[]) sourceValue);
138+
if (sourceValue instanceof String) return ByteBuffer.wrap(((String) sourceValue).getBytes(StandardCharsets.UTF_8));
139+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
140+
}
141+
142+
protected Object convertFixed(Object sourceValue, S sourceSchema, Type targetType) {
143+
if (sourceValue instanceof byte[]) return sourceValue;
144+
if (sourceValue instanceof ByteBuffer) return ByteBuffers.toByteArray((ByteBuffer) sourceValue);
145+
if (sourceValue instanceof String) return ((String) sourceValue).getBytes(StandardCharsets.UTF_8);
146+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
147+
}
148+
149+
protected Object convertUUID(Object sourceValue, S sourceSchema, Type targetType) {
150+
UUID uuid = null;
151+
if (sourceValue instanceof String) {
152+
uuid = UUID.fromString(sourceValue.toString());
153+
} else if (sourceValue instanceof UUID) {
154+
uuid = (UUID) sourceValue;
155+
} else if (sourceValue instanceof ByteBuffer) {
156+
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
157+
if (bb.remaining() == 16) {
158+
uuid = new UUID(bb.getLong(), bb.getLong());
159+
}
160+
}
161+
if (uuid != null) {
162+
return UUIDUtil.convert(uuid);
163+
}
164+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
165+
}
166+
167+
protected Object convertDecimal(Object sourceValue, S ignoredSourceSchema, Types.DecimalType targetType) {
168+
if (sourceValue instanceof BigDecimal) return sourceValue;
169+
if (sourceValue instanceof String) return new BigDecimal((String) sourceValue);
170+
if (sourceValue instanceof byte[]) return new BigDecimal(new java.math.BigInteger((byte[]) sourceValue), targetType.scale());
171+
if (sourceValue instanceof ByteBuffer) {
172+
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
173+
byte[] bytes = new byte[bb.remaining()];
174+
bb.get(bytes);
175+
return new BigDecimal(new java.math.BigInteger(bytes), targetType.scale());
176+
}
177+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
178+
}
179+
180+
protected Object convertDate(Object sourceValue, S ignoredSourceSchema, Type targetType) {
181+
if (sourceValue instanceof LocalDate) return sourceValue;
182+
if (sourceValue instanceof Number) return LocalDate.ofEpochDay(((Number) sourceValue).intValue());
183+
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalDate();
184+
if (sourceValue instanceof String) return LocalDate.parse(sourceValue.toString());
185+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
186+
}
187+
188+
protected Object convertTime(Object sourceValue, S sourceSchema, Type targetType) {
189+
if (sourceValue instanceof LocalTime) return sourceValue;
190+
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalTime();
191+
if (sourceValue instanceof String) return LocalTime.parse(sourceValue.toString());
192+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
193+
}
194+
195+
protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.TimestampType targetType) {
196+
if (sourceValue instanceof Temporal) return sourceValue;
197+
if (sourceValue instanceof Date) {
198+
Instant instant = ((Date) sourceValue).toInstant();
199+
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
200+
}
201+
if (sourceValue instanceof String) {
202+
Instant instant = Instant.parse(sourceValue.toString());
203+
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
204+
}
205+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
206+
}
207+
208+
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
209+
210+
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
211+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.avro.LogicalType;
22+
import org.apache.avro.LogicalTypes;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.generic.GenericData;
25+
import org.apache.avro.generic.GenericRecord;
26+
import org.apache.avro.util.Utf8;
27+
import org.apache.iceberg.types.Type;
28+
import org.apache.iceberg.types.Types;
29+
import org.apache.iceberg.util.DateTimeUtil;
30+
31+
import java.nio.ByteBuffer;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
/**
38+
* A concrete implementation of TypeAdapter that converts values from Avro's
39+
* data representation to Iceberg's internal Java type representation.
40+
* <p>
41+
* This class extends {@link AbstractTypeAdapter} and overrides methods to handle
42+
* Avro-specific types like Utf8, EnumSymbol, and Fixed, as well as Avro's
43+
* specific representations for List and Map.
44+
*/
45+
public class AvroValueAdapter extends AbstractTypeAdapter<Schema> {
46+
private static final org.apache.avro.Schema STRING_SCHEMA_INSTANCE = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
47+
48+
@Override
49+
protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) {
50+
if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) {
51+
return sourceValue.toString();
52+
}
53+
return super.convertString(sourceValue, sourceSchema, targetType);
54+
}
55+
56+
@Override
57+
protected Object convertBinary(Object sourceValue, Schema sourceSchema, Type targetType) {
58+
if (sourceValue instanceof GenericData.Fixed) {
59+
return ByteBuffer.wrap(((GenericData.Fixed) sourceValue).bytes());
60+
}
61+
return super.convertBinary(sourceValue, sourceSchema, targetType);
62+
}
63+
64+
@Override
65+
protected Object convertFixed(Object sourceValue, Schema sourceSchema, Type targetType) {
66+
if (sourceValue instanceof GenericData.Fixed) {
67+
return ((GenericData.Fixed) sourceValue).bytes();
68+
}
69+
return super.convertFixed(sourceValue, sourceSchema, targetType);
70+
}
71+
72+
@Override
73+
protected Object convertUUID(Object sourceValue, Schema sourceSchema, Type targetType) {
74+
if (sourceValue instanceof Utf8) {
75+
return super.convertUUID(sourceValue.toString(), sourceSchema, targetType);
76+
}
77+
return super.convertUUID(sourceValue, sourceSchema, targetType);
78+
}
79+
80+
@Override
81+
protected Object convertTime(Object sourceValue, Schema sourceSchema, Type targetType) {
82+
if (sourceValue instanceof Number) {
83+
LogicalType logicalType = sourceSchema.getLogicalType();
84+
if (logicalType instanceof LogicalTypes.TimeMicros) {
85+
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue());
86+
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
87+
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue() * 1000);
88+
}
89+
}
90+
return super.convertTime(sourceValue, sourceSchema, targetType);
91+
}
92+
93+
@Override
94+
protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types.TimestampType targetType) {
95+
if (sourceValue instanceof Number) {
96+
long value = ((Number) sourceValue).longValue();
97+
LogicalType logicalType = sourceSchema.getLogicalType();
98+
if (logicalType instanceof LogicalTypes.TimestampMillis) {
99+
return targetType.shouldAdjustToUTC()
100+
? DateTimeUtil.timestamptzFromMicros(value * 1000)
101+
: DateTimeUtil.timestampFromMicros(value * 1000);
102+
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
103+
return targetType.shouldAdjustToUTC()
104+
? DateTimeUtil.timestamptzFromMicros(value)
105+
: DateTimeUtil.timestampFromMicros(value);
106+
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
107+
return DateTimeUtil.timestampFromMicros(value * 1000);
108+
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
109+
return DateTimeUtil.timestampFromMicros(value);
110+
}
111+
}
112+
return super.convertTimestamp(sourceValue, sourceSchema, targetType);
113+
}
114+
115+
@Override
116+
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
117+
Schema elementSchema = sourceSchema.getElementType();
118+
List<?> sourceList = (List<?>) sourceValue;
119+
List<Object> list = new ArrayList<>();
120+
for (Object element : sourceList) {
121+
Object convert = convert(element, elementSchema, targetType.elementType());
122+
list.add(convert);
123+
}
124+
return list;
125+
}
126+
127+
@Override
128+
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
129+
if (sourceValue instanceof GenericData.Array) {
130+
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
131+
// Handle map represented as an array of key-value records
132+
Map<Object, Object> recordMap = new HashMap<>();
133+
Schema kvSchema = sourceSchema.getElementType();
134+
Schema.Field keyField = kvSchema.getFields().get(0);
135+
Schema.Field valueField = kvSchema.getFields().get(1);
136+
137+
for (Object element : arrayValue) {
138+
GenericRecord record = (GenericRecord) element;
139+
Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType());
140+
Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType());
141+
recordMap.put(key, value);
142+
}
143+
return recordMap;
144+
} else {
145+
// Handle standard Java Map
146+
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
147+
Map<Object, Object> adaptedMap = new HashMap<>();
148+
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
149+
// Avro map keys are always strings
150+
Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType());
151+
Object value = convert(entry.getValue(), sourceSchema.getValueType(), targetType.valueType());
152+
adaptedMap.put(key, value);
153+
}
154+
return adaptedMap;
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)