Skip to content

feat(binder): implement AvroValueAdapter and RecordBinder for Avro to Iceberg conversion[WIP] #2744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.automq.table.binder;

import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.temporal.Temporal;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Abstract implementation providing common type conversion logic from source formats
* to Iceberg's internal Java type representation.
* <p>
* Handles dispatch logic and provides default conversion implementations for primitive types.
* Subclasses implement format-specific conversion for complex types (LIST, MAP, STRUCT).
*
* @param <S> The type of the source schema (e.g., org.apache.avro.Schema)
*/
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {

@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
@Override
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
if (sourceValue == null) {
return null;
}

switch (targetType.typeId()) {
case BOOLEAN:
return convertBoolean(sourceValue, sourceSchema, targetType);
case INTEGER:
return convertInteger(sourceValue, sourceSchema, targetType);
case LONG:
return convertLong(sourceValue, sourceSchema, targetType);
case FLOAT:
return convertFloat(sourceValue, sourceSchema, targetType);
case DOUBLE:
return convertDouble(sourceValue, sourceSchema, targetType);
case STRING:
return convertString(sourceValue, sourceSchema, targetType);
case BINARY:
return convertBinary(sourceValue, sourceSchema, targetType);
case FIXED:
return convertFixed(sourceValue, sourceSchema, targetType);
case UUID:
return convertUUID(sourceValue, sourceSchema, targetType);
case DECIMAL:
return convertDecimal(sourceValue, sourceSchema, (Types.DecimalType) targetType);
case DATE:
return convertDate(sourceValue, sourceSchema, targetType);
case TIME:
return convertTime(sourceValue, sourceSchema, targetType);
case TIMESTAMP:
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
case LIST:
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
case MAP:
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
default:
return sourceValue;
}
}

protected Object convertBoolean(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof Boolean) return sourceValue;
if (sourceValue instanceof String) return Boolean.parseBoolean((String) sourceValue);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertInteger(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof Integer) return sourceValue;
if (sourceValue instanceof Number) return ((Number) sourceValue).intValue();
if (sourceValue instanceof String) return Integer.parseInt((String) sourceValue);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertLong(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof Long) return sourceValue;
if (sourceValue instanceof Number) return ((Number) sourceValue).longValue();
if (sourceValue instanceof String) return Long.parseLong((String) sourceValue);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertFloat(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof Float) return sourceValue;
if (sourceValue instanceof Number) return ((Number) sourceValue).floatValue();
if (sourceValue instanceof String) return Float.parseFloat((String) sourceValue);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertDouble(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof Double) return sourceValue;
if (sourceValue instanceof Number) return ((Number) sourceValue).doubleValue();
if (sourceValue instanceof String) return Double.parseDouble((String) sourceValue);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertString(Object sourceValue, S sourceSchema, Type targetType) {
if (sourceValue instanceof String) {
return sourceValue;
}
// Simple toString conversion - subclasses can override for more complex logic
return sourceValue.toString();
}

protected Object convertBinary(Object sourceValue, S sourceSchema, Type targetType) {
if (sourceValue instanceof ByteBuffer) return ((ByteBuffer) sourceValue).duplicate();
if (sourceValue instanceof byte[]) return ByteBuffer.wrap((byte[]) sourceValue);
if (sourceValue instanceof String) return ByteBuffer.wrap(((String) sourceValue).getBytes(StandardCharsets.UTF_8));
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertFixed(Object sourceValue, S sourceSchema, Type targetType) {
if (sourceValue instanceof byte[]) return sourceValue;
if (sourceValue instanceof ByteBuffer) return ByteBuffers.toByteArray((ByteBuffer) sourceValue);
if (sourceValue instanceof String) return ((String) sourceValue).getBytes(StandardCharsets.UTF_8);
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertUUID(Object sourceValue, S sourceSchema, Type targetType) {
UUID uuid = null;
if (sourceValue instanceof String) {
uuid = UUID.fromString(sourceValue.toString());
} else if (sourceValue instanceof UUID) {
uuid = (UUID) sourceValue;
} else if (sourceValue instanceof ByteBuffer) {
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
if (bb.remaining() == 16) {
uuid = new UUID(bb.getLong(), bb.getLong());
}
}
if (uuid != null) {
return UUIDUtil.convert(uuid);
}
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertDecimal(Object sourceValue, S ignoredSourceSchema, Types.DecimalType targetType) {
if (sourceValue instanceof BigDecimal) return sourceValue;
if (sourceValue instanceof String) return new BigDecimal((String) sourceValue);
if (sourceValue instanceof byte[]) return new BigDecimal(new java.math.BigInteger((byte[]) sourceValue), targetType.scale());
if (sourceValue instanceof ByteBuffer) {
ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate();
byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
return new BigDecimal(new java.math.BigInteger(bytes), targetType.scale());
}
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertDate(Object sourceValue, S ignoredSourceSchema, Type targetType) {
if (sourceValue instanceof LocalDate) return sourceValue;
if (sourceValue instanceof Number) return LocalDate.ofEpochDay(((Number) sourceValue).intValue());
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalDate();
if (sourceValue instanceof String) return LocalDate.parse(sourceValue.toString());
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertTime(Object sourceValue, S sourceSchema, Type targetType) {
if (sourceValue instanceof LocalTime) return sourceValue;
if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalTime();
if (sourceValue instanceof String) return LocalTime.parse(sourceValue.toString());
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.TimestampType targetType) {
if (sourceValue instanceof Temporal) return sourceValue;
if (sourceValue instanceof Date) {
Instant instant = ((Date) sourceValue).toInstant();
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
}
if (sourceValue instanceof String) {
Instant instant = Instant.parse(sourceValue.toString());
return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant));
}
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
}

protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);

protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
}
157 changes: 157 additions & 0 deletions core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.automq.table.binder;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A concrete implementation of TypeAdapter that converts values from Avro's
* data representation to Iceberg's internal Java type representation.
* <p>
* This class extends {@link AbstractTypeAdapter} and overrides methods to handle
* Avro-specific types like Utf8, EnumSymbol, and Fixed, as well as Avro's
* specific representations for List and Map.
*/
public class AvroValueAdapter extends AbstractTypeAdapter<Schema> {
private static final org.apache.avro.Schema STRING_SCHEMA_INSTANCE = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);

@Override
protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) {
return sourceValue.toString();
}
return super.convertString(sourceValue, sourceSchema, targetType);
}

@Override
protected Object convertBinary(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof GenericData.Fixed) {
return ByteBuffer.wrap(((GenericData.Fixed) sourceValue).bytes());
}
return super.convertBinary(sourceValue, sourceSchema, targetType);
}

@Override
protected Object convertFixed(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof GenericData.Fixed) {
return ((GenericData.Fixed) sourceValue).bytes();
}
return super.convertFixed(sourceValue, sourceSchema, targetType);
}

@Override
protected Object convertUUID(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof Utf8) {
return super.convertUUID(sourceValue.toString(), sourceSchema, targetType);
}
return super.convertUUID(sourceValue, sourceSchema, targetType);
}

@Override
protected Object convertTime(Object sourceValue, Schema sourceSchema, Type targetType) {
if (sourceValue instanceof Number) {
LogicalType logicalType = sourceSchema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimeMicros) {
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue());
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue() * 1000);
}
}
return super.convertTime(sourceValue, sourceSchema, targetType);
}

@Override
protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types.TimestampType targetType) {
if (sourceValue instanceof Number) {
long value = ((Number) sourceValue).longValue();
LogicalType logicalType = sourceSchema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimestampMillis) {
return targetType.shouldAdjustToUTC()
? DateTimeUtil.timestamptzFromMicros(value * 1000)
: DateTimeUtil.timestampFromMicros(value * 1000);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return targetType.shouldAdjustToUTC()
? DateTimeUtil.timestamptzFromMicros(value)
: DateTimeUtil.timestampFromMicros(value);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
return DateTimeUtil.timestampFromMicros(value * 1000);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
return DateTimeUtil.timestampFromMicros(value);
}
}
return super.convertTimestamp(sourceValue, sourceSchema, targetType);
}

@Override
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
Schema elementSchema = sourceSchema.getElementType();
List<?> sourceList = (List<?>) sourceValue;
List<Object> list = new ArrayList<>();
for (Object element : sourceList) {
Object convert = convert(element, elementSchema, targetType.elementType());
list.add(convert);
}
return list;
}

@Override
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
if (sourceValue instanceof GenericData.Array) {
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
// Handle map represented as an array of key-value records
Map<Object, Object> recordMap = new HashMap<>();
Schema kvSchema = sourceSchema.getElementType();
Schema.Field keyField = kvSchema.getFields().get(0);
Schema.Field valueField = kvSchema.getFields().get(1);

for (Object element : arrayValue) {
GenericRecord record = (GenericRecord) element;
Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType());
Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType());
recordMap.put(key, value);
}
return recordMap;
} else {
// Handle standard Java Map
Map<?, ?> sourceMap = (Map<?, ?>) sourceValue;
Map<Object, Object> adaptedMap = new HashMap<>();
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
// Avro map keys are always strings
Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType());
Object value = convert(entry.getValue(), sourceSchema.getValueType(), targetType.valueType());
adaptedMap.put(key, value);
}
return adaptedMap;
}
}
}
Loading
Loading