Skip to content

Commit 30abf31

Browse files
committed
feat(binder): implement AvroValueAdapter and RecordBinder for Avro to Iceberg conversion
1 parent 59a92fa commit 30abf31

File tree

7 files changed

+2239
-0
lines changed

7 files changed

+2239
-0
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.iceberg.types.Type;
22+
import org.apache.iceberg.types.Types;
23+
import org.apache.iceberg.util.ByteBuffers;
24+
import org.apache.iceberg.util.DateTimeUtil;
25+
import org.apache.iceberg.util.UUIDUtil;
26+
27+
import java.math.BigDecimal;
28+
import java.nio.ByteBuffer;
29+
import java.nio.charset.StandardCharsets;
30+
import java.time.Instant;
31+
import java.time.LocalDate;
32+
import java.time.LocalTime;
33+
import java.time.ZoneOffset;
34+
import java.time.temporal.Temporal;
35+
import java.util.Date;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
40+
/**
41+
* Abstract implementation providing common type conversion logic from source formats
42+
* to Iceberg's internal Java type representation.
43+
* <p>
44+
* Handles dispatch logic and provides default conversion implementations for primitive types.
45+
* Subclasses implement format-specific conversion for complex types (LIST, MAP, STRUCT).
46+
*
47+
* @param <S> The type of the source schema (e.g., org.apache.avro.Schema)
48+
*/
49+
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {
50+
51+
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
52+
@Override
53+
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
54+
if (sourceValue == null) {
55+
return null;
56+
}
57+
58+
switch (targetType.typeId()) {
59+
case BOOLEAN:
60+
return convertBoolean(sourceValue, sourceSchema, targetType);
61+
case INTEGER:
62+
return convertInteger(sourceValue, sourceSchema, targetType);
63+
case LONG:
64+
return convertLong(sourceValue, sourceSchema, targetType);
65+
case FLOAT:
66+
return convertFloat(sourceValue, sourceSchema, targetType);
67+
case DOUBLE:
68+
return convertDouble(sourceValue, sourceSchema, targetType);
69+
case STRING:
70+
return convertString(sourceValue, sourceSchema, targetType);
71+
case BINARY:
72+
return convertBinary(sourceValue, sourceSchema, targetType);
73+
case FIXED:
74+
return convertFixed(sourceValue, sourceSchema, targetType);
75+
case UUID:
76+
return convertUUID(sourceValue, sourceSchema, targetType);
77+
case DECIMAL:
78+
return convertDecimal(sourceValue, sourceSchema, (Types.DecimalType) targetType);
79+
case DATE:
80+
return convertDate(sourceValue, sourceSchema, targetType);
81+
case TIME:
82+
return convertTime(sourceValue, sourceSchema, targetType);
83+
case TIMESTAMP:
84+
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
85+
case LIST:
86+
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
87+
case MAP:
88+
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
89+
default:
90+
return sourceValue;
91+
}
92+
}
93+
94+
protected Object convertBoolean(Object sourceValue, S ignoredSourceSchema, Type targetType) {
95+
if (sourceValue instanceof Boolean) return sourceValue;
96+
if (sourceValue instanceof String) return Boolean.parseBoolean((String) sourceValue);
97+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
98+
}
99+
100+
protected Object convertInteger(Object sourceValue, S ignoredSourceSchema, Type targetType) {
101+
if (sourceValue instanceof Integer) return sourceValue;
102+
if (sourceValue instanceof Number) return ((Number) sourceValue).intValue();
103+
if (sourceValue instanceof String) return Integer.parseInt((String) sourceValue);
104+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
105+
}
106+
107+
protected Object convertLong(Object sourceValue, S ignoredSourceSchema, Type targetType) {
108+
if (sourceValue instanceof Long) return sourceValue;
109+
if (sourceValue instanceof Number) return ((Number) sourceValue).longValue();
110+
if (sourceValue instanceof String) return Long.parseLong((String) sourceValue);
111+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
112+
}
113+
114+
protected Object convertFloat(Object sourceValue, S ignoredSourceSchema, Type targetType) {
115+
if (sourceValue instanceof Float) return sourceValue;
116+
if (sourceValue instanceof Number) return ((Number) sourceValue).floatValue();
117+
if (sourceValue instanceof String) return Float.parseFloat((String) sourceValue);
118+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
119+
}
120+
121+
protected Object convertDouble(Object sourceValue, S ignoredSourceSchema, Type targetType) {
122+
if (sourceValue instanceof Double) return sourceValue;
123+
if (sourceValue instanceof Number) return ((Number) sourceValue).doubleValue();
124+
if (sourceValue instanceof String) return Double.parseDouble((String) sourceValue);
125+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
126+
}
127+
128+
protected Object convertString(Object sourceValue, S sourceSchema, Type targetType) {
129+
if (sourceValue instanceof String) {
130+
return sourceValue;
131+
}
132+
// Simple toString conversion - subclasses can override for more complex logic
133+
return sourceValue.toString();
134+
}
135+
136+
protected Object convertBinary(Object sourceValue, S sourceSchema, Type targetType) {
137+
if (sourceValue instanceof ByteBuffer) return ((ByteBuffer) sourceValue).duplicate();
138+
if (sourceValue instanceof byte[]) return ByteBuffer.wrap((byte[]) sourceValue);
139+
if (sourceValue instanceof String) return ByteBuffer.wrap(((String) sourceValue).getBytes(StandardCharsets.UTF_8));
140+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
141+
}
142+
143+
protected Object convertFixed(Object sourceValue, S sourceSchema, Type targetType) {
144+
if (sourceValue instanceof byte[]) return sourceValue;
145+
if (sourceValue instanceof ByteBuffer) return ByteBuffers.toByteArray((ByteBuffer) sourceValue);
146+
if (sourceValue instanceof String) return ((String) sourceValue).getBytes(StandardCharsets.UTF_8);
147+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
148+
}
149+
150+
protected Object convertUUID(Object sourceValue, S sourceSchema, Type targetType) {
151+
UUID uuid = null;
152+
if (sourceValue instanceof String) {
153+
uuid = UUID.fromString(sourceValue.toString());
154+
} else if (sourceValue instanceof UUID) {
155+
uuid = (UUID) sourceValue;
156+
} else if (sourceValue instanceof ByteBuffer) {
157+
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
158+
if (bb.remaining() == 16) {
159+
uuid = new UUID(bb.getLong(), bb.getLong());
160+
}
161+
}
162+
if (uuid != null) {
163+
return UUIDUtil.convert(uuid);
164+
}
165+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
166+
}
167+
168+
protected Object convertDecimal(Object sourceValue, S ignoredSourceSchema, Types.DecimalType targetType) {
169+
if (sourceValue instanceof BigDecimal) return sourceValue;
170+
if (sourceValue instanceof String) return new BigDecimal((String) sourceValue);
171+
if (sourceValue instanceof byte[]) return new BigDecimal(new java.math.BigInteger((byte[]) sourceValue), targetType.scale());
172+
if (sourceValue instanceof ByteBuffer) {
173+
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
174+
byte[] bytes = new byte[bb.remaining()];
175+
bb.get(bytes);
176+
return new BigDecimal(new java.math.BigInteger(bytes), targetType.scale());
177+
}
178+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
179+
}
180+
181+
protected Object convertDate(Object sourceValue, S ignoredSourceSchema, Type targetType) {
182+
if (sourceValue instanceof LocalDate) return sourceValue;
183+
if (sourceValue instanceof Number) return LocalDate.ofEpochDay(((Number) sourceValue).intValue());
184+
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalDate();
185+
if (sourceValue instanceof String) return LocalDate.parse(sourceValue.toString());
186+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
187+
}
188+
189+
protected Object convertTime(Object sourceValue, S sourceSchema, Type targetType) {
190+
if (sourceValue instanceof LocalTime) return sourceValue;
191+
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalTime();
192+
if (sourceValue instanceof String) return LocalTime.parse(sourceValue.toString());
193+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
194+
}
195+
196+
protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.TimestampType targetType) {
197+
if (sourceValue instanceof Temporal) return sourceValue;
198+
if (sourceValue instanceof Date) {
199+
Instant instant = ((Date) sourceValue).toInstant();
200+
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
201+
}
202+
if (sourceValue instanceof String) {
203+
Instant instant = Instant.parse(sourceValue.toString());
204+
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
205+
}
206+
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
207+
}
208+
209+
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
210+
211+
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
212+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package kafka.automq.table.binder;
20+
21+
import org.apache.avro.LogicalType;
22+
import org.apache.avro.LogicalTypes;
23+
import org.apache.avro.Schema;
24+
import org.apache.avro.generic.GenericData;
25+
import org.apache.avro.generic.GenericRecord;
26+
import org.apache.avro.util.Utf8;
27+
import org.apache.iceberg.types.Type;
28+
import org.apache.iceberg.types.Types;
29+
import org.apache.iceberg.util.DateTimeUtil;
30+
31+
import java.nio.ByteBuffer;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
/**
38+
* A concrete implementation of TypeAdapter that converts values from Avro's
39+
* data representation to Iceberg's internal Java type representation.
40+
* <p>
41+
* This class extends {@link AbstractTypeAdapter} and overrides methods to handle
42+
* Avro-specific types like Utf8, EnumSymbol, and Fixed, as well as Avro's
43+
* specific representations for List and Map.
44+
*/
45+
public class AvroValueAdapter extends AbstractTypeAdapter<Schema> {
46+
private static final org.apache.avro.Schema STRING_SCHEMA_INSTANCE = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
47+
48+
@Override
49+
protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) {
50+
if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) {
51+
return sourceValue.toString();
52+
}
53+
return super.convertString(sourceValue, sourceSchema, targetType);
54+
}
55+
56+
@Override
57+
protected Object convertBinary(Object sourceValue, Schema sourceSchema, Type targetType) {
58+
if (sourceValue instanceof GenericData.Fixed) {
59+
return ByteBuffer.wrap(((GenericData.Fixed) sourceValue).bytes());
60+
}
61+
return super.convertBinary(sourceValue, sourceSchema, targetType);
62+
}
63+
64+
@Override
65+
protected Object convertFixed(Object sourceValue, Schema sourceSchema, Type targetType) {
66+
if (sourceValue instanceof GenericData.Fixed) {
67+
return ((GenericData.Fixed) sourceValue).bytes();
68+
}
69+
return super.convertFixed(sourceValue, sourceSchema, targetType);
70+
}
71+
72+
@Override
73+
protected Object convertUUID(Object sourceValue, Schema sourceSchema, Type targetType) {
74+
if (sourceValue instanceof Utf8) {
75+
return super.convertUUID(sourceValue.toString(), sourceSchema, targetType);
76+
}
77+
return super.convertUUID(sourceValue, sourceSchema, targetType);
78+
}
79+
80+
@Override
81+
protected Object convertTime(Object sourceValue, Schema sourceSchema, Type targetType) {
82+
if (sourceValue instanceof Number) {
83+
LogicalType logicalType = sourceSchema.getLogicalType();
84+
if (logicalType instanceof LogicalTypes.TimeMicros) {
85+
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue());
86+
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
87+
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue() * 1000);
88+
}
89+
}
90+
return super.convertTime(sourceValue, sourceSchema, targetType);
91+
}
92+
93+
@Override
94+
protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types.TimestampType targetType) {
95+
if (sourceValue instanceof Number) {
96+
long value = ((Number) sourceValue).longValue();
97+
LogicalType logicalType = sourceSchema.getLogicalType();
98+
if (logicalType instanceof LogicalTypes.TimestampMillis) {
99+
return targetType.shouldAdjustToUTC()
100+
? DateTimeUtil.timestamptzFromMicros(value * 1000)
101+
: DateTimeUtil.timestampFromMicros(value * 1000);
102+
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
103+
return targetType.shouldAdjustToUTC()
104+
? DateTimeUtil.timestamptzFromMicros(value)
105+
: DateTimeUtil.timestampFromMicros(value);
106+
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
107+
return DateTimeUtil.timestampFromMicros(value * 1000);
108+
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
109+
return DateTimeUtil.timestampFromMicros(value);
110+
}
111+
}
112+
return super.convertTimestamp(sourceValue, sourceSchema, targetType);
113+
}
114+
115+
@Override
116+
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
117+
Schema elementSchema = sourceSchema.getElementType();
118+
List<?> sourceList = (List<?>) sourceValue;
119+
List<Object> list = new ArrayList<>();
120+
for (Object element : sourceList) {
121+
Object convert = convert(element, elementSchema, targetType.elementType());
122+
list.add(convert);
123+
}
124+
return list;
125+
}
126+
127+
@Override
128+
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
129+
if (sourceValue instanceof GenericData.Array) {
130+
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
131+
// Handle map represented as an array of key-value records
132+
Map<Object, Object> recordMap = new HashMap<>();
133+
Schema kvSchema = sourceSchema.getElementType();
134+
Schema.Field keyField = kvSchema.getFields().get(0);
135+
Schema.Field valueField = kvSchema.getFields().get(1);
136+
137+
for (Object element : arrayValue) {
138+
GenericRecord record = (GenericRecord) element;
139+
Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType());
140+
Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType());
141+
recordMap.put(key, value);
142+
}
143+
return recordMap;
144+
} else {
145+
// Handle standard Java Map
146+
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
147+
Map<Object, Object> adaptedMap = new HashMap<>();
148+
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
149+
// Avro map keys are always strings
150+
Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType());
151+
Object value = convert(entry.getValue(), sourceSchema.getValueType(), targetType.valueType());
152+
adaptedMap.put(key, value);
153+
}
154+
return adaptedMap;
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)