diff --git a/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java b/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java new file mode 100644 index 0000000000..d095ad6f75 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java @@ -0,0 +1,212 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.temporal.Temporal; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Abstract implementation providing common type conversion logic from source formats + * to Iceberg's internal Java type representation. + *

+ * Handles dispatch logic and provides default conversion implementations for primitive types. + * Subclasses implement format-specific conversion for complex types (LIST, MAP, STRUCT). + * + * @param The type of the source schema (e.g., org.apache.avro.Schema) + */ +public abstract class AbstractTypeAdapter implements TypeAdapter { + + @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) + @Override + public Object convert(Object sourceValue, S sourceSchema, Type targetType) { + if (sourceValue == null) { + return null; + } + + switch (targetType.typeId()) { + case BOOLEAN: + return convertBoolean(sourceValue, sourceSchema, targetType); + case INTEGER: + return convertInteger(sourceValue, sourceSchema, targetType); + case LONG: + return convertLong(sourceValue, sourceSchema, targetType); + case FLOAT: + return convertFloat(sourceValue, sourceSchema, targetType); + case DOUBLE: + return convertDouble(sourceValue, sourceSchema, targetType); + case STRING: + return convertString(sourceValue, sourceSchema, targetType); + case BINARY: + return convertBinary(sourceValue, sourceSchema, targetType); + case FIXED: + return convertFixed(sourceValue, sourceSchema, targetType); + case UUID: + return convertUUID(sourceValue, sourceSchema, targetType); + case DECIMAL: + return convertDecimal(sourceValue, sourceSchema, (Types.DecimalType) targetType); + case DATE: + return convertDate(sourceValue, sourceSchema, targetType); + case TIME: + return convertTime(sourceValue, sourceSchema, targetType); + case TIMESTAMP: + return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType); + case LIST: + return convertList(sourceValue, sourceSchema, (Types.ListType) targetType); + case MAP: + return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType); + default: + return sourceValue; + } + } + + protected Object convertBoolean(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof Boolean) return sourceValue; + if (sourceValue instanceof String) return Boolean.parseBoolean((String) sourceValue); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertInteger(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof Integer) return sourceValue; + if (sourceValue instanceof Number) return ((Number) sourceValue).intValue(); + if (sourceValue instanceof String) return Integer.parseInt((String) sourceValue); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertLong(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof Long) return sourceValue; + if (sourceValue instanceof Number) return ((Number) sourceValue).longValue(); + if (sourceValue instanceof String) return Long.parseLong((String) sourceValue); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertFloat(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof Float) return sourceValue; + if (sourceValue instanceof Number) return ((Number) sourceValue).floatValue(); + if (sourceValue instanceof String) return Float.parseFloat((String) sourceValue); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertDouble(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof Double) return sourceValue; + if (sourceValue instanceof Number) return ((Number) sourceValue).doubleValue(); + if (sourceValue instanceof String) return Double.parseDouble((String) sourceValue); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertString(Object sourceValue, S sourceSchema, Type targetType) { + if (sourceValue instanceof String) { + return sourceValue; + } + // Simple toString conversion - subclasses can override for more complex logic + return sourceValue.toString(); + } + + protected Object convertBinary(Object sourceValue, S sourceSchema, Type targetType) { + if (sourceValue instanceof ByteBuffer) return ((ByteBuffer) sourceValue).duplicate(); + if (sourceValue instanceof byte[]) return ByteBuffer.wrap((byte[]) sourceValue); + if (sourceValue instanceof String) return ByteBuffer.wrap(((String) sourceValue).getBytes(StandardCharsets.UTF_8)); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertFixed(Object sourceValue, S sourceSchema, Type targetType) { + if (sourceValue instanceof byte[]) return sourceValue; + if (sourceValue instanceof ByteBuffer) return ByteBuffers.toByteArray((ByteBuffer) sourceValue); + if (sourceValue instanceof String) return ((String) sourceValue).getBytes(StandardCharsets.UTF_8); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertUUID(Object sourceValue, S sourceSchema, Type targetType) { + UUID uuid = null; + if (sourceValue instanceof String) { + uuid = UUID.fromString(sourceValue.toString()); + } else if (sourceValue instanceof UUID) { + uuid = (UUID) sourceValue; + } else if (sourceValue instanceof ByteBuffer) { + ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate(); + if (bb.remaining() == 16) { + uuid = new UUID(bb.getLong(), bb.getLong()); + } + } + if (uuid != null) { + return UUIDUtil.convert(uuid); + } + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertDecimal(Object sourceValue, S ignoredSourceSchema, Types.DecimalType targetType) { + if (sourceValue instanceof BigDecimal) return sourceValue; + if (sourceValue instanceof String) return new BigDecimal((String) sourceValue); + if (sourceValue instanceof byte[]) return new BigDecimal(new java.math.BigInteger((byte[]) sourceValue), targetType.scale()); + if (sourceValue instanceof ByteBuffer) { + ByteBuffer bb = ((ByteBuffer) sourceValue).duplicate(); + byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + return new BigDecimal(new java.math.BigInteger(bytes), targetType.scale()); + } + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertDate(Object sourceValue, S ignoredSourceSchema, Type targetType) { + if (sourceValue instanceof LocalDate) return sourceValue; + if (sourceValue instanceof Number) return LocalDate.ofEpochDay(((Number) sourceValue).intValue()); + if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalDate(); + if (sourceValue instanceof String) return LocalDate.parse(sourceValue.toString()); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertTime(Object sourceValue, S sourceSchema, Type targetType) { + if (sourceValue instanceof LocalTime) return sourceValue; + if (sourceValue instanceof Date) return ((Date) sourceValue).toInstant().atZone(ZoneOffset.UTC).toLocalTime(); + if (sourceValue instanceof String) return LocalTime.parse(sourceValue.toString()); + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.TimestampType targetType) { + if (sourceValue instanceof Temporal) return sourceValue; + if (sourceValue instanceof Date) { + Instant instant = ((Date) sourceValue).toInstant(); + return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant)); + } + if (sourceValue instanceof String) { + Instant instant = Instant.parse(sourceValue.toString()); + return DateTimeUtil.timestamptzFromMicros(DateTimeUtil.microsFromInstant(instant)); + } + throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId()); + } + + protected abstract List convertList(Object sourceValue, S sourceSchema, Types.ListType targetType); + + protected abstract Map convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType); +} diff --git a/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java new file mode 100644 index 0000000000..1ac67804ef --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java @@ -0,0 +1,157 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A concrete implementation of TypeAdapter that converts values from Avro's + * data representation to Iceberg's internal Java type representation. + *

+ * This class extends {@link AbstractTypeAdapter} and overrides methods to handle + * Avro-specific types like Utf8, EnumSymbol, and Fixed, as well as Avro's + * specific representations for List and Map. + */ +public class AvroValueAdapter extends AbstractTypeAdapter { + private static final org.apache.avro.Schema STRING_SCHEMA_INSTANCE = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING); + + @Override + protected Object convertString(Object sourceValue, Schema sourceSchema, Type targetType) { + if (sourceValue instanceof Utf8 || sourceValue instanceof GenericData.EnumSymbol) { + return sourceValue.toString(); + } + return super.convertString(sourceValue, sourceSchema, targetType); + } + + @Override + protected Object convertBinary(Object sourceValue, Schema sourceSchema, Type targetType) { + if (sourceValue instanceof GenericData.Fixed) { + return ByteBuffer.wrap(((GenericData.Fixed) sourceValue).bytes()); + } + return super.convertBinary(sourceValue, sourceSchema, targetType); + } + + @Override + protected Object convertFixed(Object sourceValue, Schema sourceSchema, Type targetType) { + if (sourceValue instanceof GenericData.Fixed) { + return ((GenericData.Fixed) sourceValue).bytes(); + } + return super.convertFixed(sourceValue, sourceSchema, targetType); + } + + @Override + protected Object convertUUID(Object sourceValue, Schema sourceSchema, Type targetType) { + if (sourceValue instanceof Utf8) { + return super.convertUUID(sourceValue.toString(), sourceSchema, targetType); + } + return super.convertUUID(sourceValue, sourceSchema, targetType); + } + + @Override + protected Object convertTime(Object sourceValue, Schema sourceSchema, Type targetType) { + if (sourceValue instanceof Number) { + LogicalType logicalType = sourceSchema.getLogicalType(); + if (logicalType instanceof LogicalTypes.TimeMicros) { + return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue()); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return DateTimeUtil.timeFromMicros(((Number) sourceValue).longValue() * 1000); + } + } + return super.convertTime(sourceValue, sourceSchema, targetType); + } + + @Override + protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types.TimestampType targetType) { + if (sourceValue instanceof Number) { + long value = ((Number) sourceValue).longValue(); + LogicalType logicalType = sourceSchema.getLogicalType(); + if (logicalType instanceof LogicalTypes.TimestampMillis) { + return targetType.shouldAdjustToUTC() + ? DateTimeUtil.timestamptzFromMicros(value * 1000) + : DateTimeUtil.timestampFromMicros(value * 1000); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return targetType.shouldAdjustToUTC() + ? DateTimeUtil.timestamptzFromMicros(value) + : DateTimeUtil.timestampFromMicros(value); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { + return DateTimeUtil.timestampFromMicros(value * 1000); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { + return DateTimeUtil.timestampFromMicros(value); + } + } + return super.convertTimestamp(sourceValue, sourceSchema, targetType); + } + + @Override + protected List convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) { + Schema elementSchema = sourceSchema.getElementType(); + List sourceList = (List) sourceValue; + List list = new ArrayList<>(); + for (Object element : sourceList) { + Object convert = convert(element, elementSchema, targetType.elementType()); + list.add(convert); + } + return list; + } + + @Override + protected Map convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) { + if (sourceValue instanceof GenericData.Array) { + GenericData.Array arrayValue = (GenericData.Array) sourceValue; + // Handle map represented as an array of key-value records + Map recordMap = new HashMap<>(); + Schema kvSchema = sourceSchema.getElementType(); + Schema.Field keyField = kvSchema.getFields().get(0); + Schema.Field valueField = kvSchema.getFields().get(1); + + for (Object element : arrayValue) { + GenericRecord record = (GenericRecord) element; + Object key = convert(record.get(keyField.pos()), keyField.schema(), targetType.keyType()); + Object value = convert(record.get(valueField.pos()), valueField.schema(), targetType.valueType()); + recordMap.put(key, value); + } + return recordMap; + } else { + // Handle standard Java Map + Map sourceMap = (Map) sourceValue; + Map adaptedMap = new HashMap<>(); + for (Map.Entry entry : sourceMap.entrySet()) { + // Avro map keys are always strings + Object key = convert(entry.getKey(), STRING_SCHEMA_INSTANCE, targetType.keyType()); + Object value = convert(entry.getValue(), sourceSchema.getValueType(), targetType.valueType()); + adaptedMap.put(key, value); + } + return adaptedMap; + } + } +} diff --git a/core/src/main/java/kafka/automq/table/binder/RecordBinder.java b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java new file mode 100644 index 0000000000..70389b1dc3 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/RecordBinder.java @@ -0,0 +1,247 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.util.HashMap; +import java.util.Map; + +/** + * A factory that creates lazy-evaluation Record views of Avro GenericRecords. + * Field values are converted only when accessed, avoiding upfront conversion overhead. + */ +public class RecordBinder { + + private final org.apache.iceberg.Schema icebergSchema; + private final TypeAdapter typeAdapter; + private final Map fieldNameToPosition; + private final FieldMapping[] fieldMappings; + + // Pre-computed RecordBinders for nested STRUCT fields + private final Map nestedStructBinders; + + + public RecordBinder(GenericRecord avroRecord) { + this(AvroSchemaUtil.toIceberg(avroRecord.getSchema()), avroRecord.getSchema()); + } + + public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema) { + this(icebergSchema, avroSchema, new AvroValueAdapter()); + } + + public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema, TypeAdapter typeAdapter) { + this.icebergSchema = icebergSchema; + this.typeAdapter = typeAdapter; + + // Pre-compute field name to position mapping + this.fieldNameToPosition = new HashMap<>(); + for (int i = 0; i < icebergSchema.columns().size(); i++) { + fieldNameToPosition.put(icebergSchema.columns().get(i).name(), i); + } + + // Initialize field mappings + this.fieldMappings = new FieldMapping[icebergSchema.columns().size()]; + initializeFieldMappings(avroSchema); + + // Pre-compute nested struct binders + this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter); + } + + /** + * Creates a new immutable Record view of the given Avro record. + * Each call returns a separate instance with its own data reference. + */ + public Record bind(GenericRecord avroRecord) { + if (avroRecord == null) { + return null; + } + return new AvroRecordView(avroRecord, icebergSchema, typeAdapter, + fieldNameToPosition, fieldMappings, nestedStructBinders); + } + + private void initializeFieldMappings(Schema avroSchema) { + for (int icebergPos = 0; icebergPos < icebergSchema.columns().size(); icebergPos++) { + Types.NestedField icebergField = icebergSchema.columns().get(icebergPos); + String fieldName = icebergField.name(); + + Schema.Field avroField = avroSchema.getField(fieldName); + if (avroField != null) { + fieldMappings[icebergPos] = createOptimizedMapping( + avroField.pos(), + icebergField.type(), + avroField.schema() + ); + } else { + fieldMappings[icebergPos] = null; + } + } + } + + private FieldMapping createOptimizedMapping(int avroPosition, Type icebergType, Schema avroType) { + FieldMapping mapping = new FieldMapping(); + mapping.avroPosition = avroPosition; + mapping.icebergType = icebergType; + mapping.typeId = icebergType.typeId(); + mapping.avroSchema = avroType; + + if (icebergType.isStructType()) { + mapping.nestedSchema = icebergType.asStructType().asSchema(); + mapping.nestedSchemaId = icebergType.toString(); + } + + return mapping; + } + + + /** + * Pre-computes RecordBinders for nested STRUCT fields. + */ + private Map precomputeNestedStructBinders(TypeAdapter typeAdapter) { + Map binders = new HashMap<>(); + + for (FieldMapping mapping : fieldMappings) { + if (mapping != null && mapping.typeId == Type.TypeID.STRUCT) { + String structId = mapping.nestedSchemaId; + if (!binders.containsKey(structId)) { + RecordBinder nestedBinder = new RecordBinder( + mapping.nestedSchema, + mapping.avroSchema, + typeAdapter + ); + binders.put(structId, nestedBinder); + } + } + } + + return binders; + } + + private static class AvroRecordView implements Record { + private final GenericRecord avroRecord; + private final org.apache.iceberg.Schema icebergSchema; + private final TypeAdapter typeAdapter; + private final Map fieldNameToPosition; + private final FieldMapping[] fieldMappings; + private final Map nestedStructBinders; + + AvroRecordView(GenericRecord avroRecord, + org.apache.iceberg.Schema icebergSchema, + TypeAdapter typeAdapter, + Map fieldNameToPosition, + FieldMapping[] fieldMappings, + Map nestedStructBinders) { + this.avroRecord = avroRecord; + this.icebergSchema = icebergSchema; + this.typeAdapter = typeAdapter; + this.fieldNameToPosition = fieldNameToPosition; + this.fieldMappings = fieldMappings; + this.nestedStructBinders = nestedStructBinders; + } + + @Override + public Object get(int pos) { + if (avroRecord == null) { + throw new IllegalStateException("Avro record is null"); + } + if (pos < 0 || pos >= fieldMappings.length) { + throw new IndexOutOfBoundsException("Field position " + pos + " out of bounds"); + } + + FieldMapping mapping = fieldMappings[pos]; + if (mapping == null) { + return null; + } + + Object avroValue = avroRecord.get(mapping.avroPosition); + if (avroValue == null) { + return null; + } + + // Handle STRUCT type + if (mapping.typeId == Type.TypeID.STRUCT) { + String structId = mapping.nestedSchemaId; + RecordBinder nestedBinder = nestedStructBinders.get(structId); + if (nestedBinder == null) { + throw new IllegalStateException("Nested binder not found for struct: " + structId); + } + return nestedBinder.bind((GenericRecord) avroValue); + } + + // Delegate conversion of all other types to the adapter + return typeAdapter.convert(avroValue, mapping.avroSchema, mapping.icebergType); + } + + @Override + public Object getField(String name) { + Integer position = fieldNameToPosition.get(name); + return position != null ? get(position) : null; + } + + @Override + public Types.StructType struct() { + return icebergSchema.asStruct(); + } + + @Override + public int size() { + return icebergSchema.columns().size(); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + // Unsupported operations + @Override + public void setField(String name, Object value) { + throw new UnsupportedOperationException("Read-only"); + } + @Override + public Record copy() { + throw new UnsupportedOperationException("Read-only"); + } + @Override + public Record copy(Map overwriteValues) { + throw new UnsupportedOperationException("Read-only"); + } + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Read-only"); + } + } + + // Field mapping structure + private static class FieldMapping { + int avroPosition; + Type icebergType; + Type.TypeID typeId; + Schema avroSchema; + + // For struct types + org.apache.iceberg.Schema nestedSchema; + String nestedSchemaId; + } +} diff --git a/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java b/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java new file mode 100644 index 0000000000..b38b3861d6 --- /dev/null +++ b/core/src/main/java/kafka/automq/table/binder/TypeAdapter.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import org.apache.iceberg.types.Type; + +/** + * Converts values between different schema systems. + * + * @param The source schema type (e.g., org.apache.avro.Schema) + */ +public interface TypeAdapter { + + /** + * Converts a source value to the target Iceberg type. + * + * @param sourceValue The source value + * @param sourceSchema The source schema + * @param targetType The target Iceberg type + * @return The converted value + */ + Object convert(Object sourceValue, S sourceSchema, Type targetType); + +} diff --git a/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java new file mode 100644 index 0000000000..99472ea87f --- /dev/null +++ b/core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java @@ -0,0 +1,987 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.automq.table.binder; + +import com.google.common.collect.ImmutableMap; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.CodecSetup; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.mockito.MockitoAnnotations; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +@Tag("S3Unit") +public class AvroRecordBinderTest { + + private static Schema avroSchema; + private InMemoryCatalog catalog; + private Table table; + private TaskWriter writer; + + static { + CodecSetup.setup(); + } + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + catalog = new InMemoryCatalog(); + catalog.initialize("test", ImmutableMap.of()); + catalog.createNamespace(Namespace.of("default")); + } + + private void testSendRecord(org.apache.iceberg.Schema schema, Record record) { + table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), "test"), schema); + writer = createTableWriter(table); + try { + writer.write(record); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private TaskWriter createTableWriter(Table table) { + FileAppenderFactory appenderFactory = new GenericAppenderFactory( + table.schema(), + table.spec(), + null, null, null) + .setAll(new HashMap<>(table.properties())) + .set(PARQUET_ROW_GROUP_SIZE_BYTES, "1"); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, System.currentTimeMillis()) + .defaultSpec(table.spec()) + .operationId(UUID.randomUUID().toString()) + .format(FileFormat.PARQUET) + .build(); + + return new UnpartitionedWriter<>( + table.spec(), + FileFormat.PARQUET, + appenderFactory, + fileFactory, + table.io(), + WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + ); + } + + private static GenericRecord serializeAndDeserialize(GenericRecord record, Schema schema) { + try { + // Serialize the avro record to a byte array + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DatumWriter datumWriter = new SpecificDatumWriter<>(schema); + Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + datumWriter.write(record, encoder); + encoder.flush(); + outputStream.close(); + + byte[] serializedBytes = outputStream.toByteArray(); + + // Deserialize the byte array back to an avro record + DatumReader datumReader = new SpecificDatumReader<>(schema); + ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedBytes); + Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + @Test + public void testSchemaEvolution() { + // Original Avro schema with 3 fields + String originalAvroSchemaJson = "{" + + "\"type\": \"record\"," + + "\"name\": \"User\"," + + "\"fields\": [" + + " {\"name\": \"id\", \"type\": \"long\"}," + + " {\"name\": \"name\", \"type\": \"string\"}," + + " {\"name\": \"email\", \"type\": \"string\"}" + + "]}"; + + // Evolved Iceberg schema: added age field, removed email field + org.apache.iceberg.Schema evolvedIcebergSchema = new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.optional(4, "age", Types.IntegerType.get()) // New field + // email field removed + ); + + Schema avroSchema = new Schema.Parser().parse(originalAvroSchemaJson); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("id", 12345L); + avroRecord.put("name", new Utf8("John Doe")); + avroRecord.put("email", new Utf8("john@example.com")); + + // Test wrapper with evolved schema + RecordBinder recordBinder = new RecordBinder(evolvedIcebergSchema, avroSchema); + Record bind = recordBinder.bind(avroRecord); + + assertEquals(12345L, bind.get(0)); // id + assertEquals("John Doe", bind.get(1)); // name + assertNull(bind.get(2)); // age - doesn't exist in Avro record + } + + + @Test + public void testWrapperReusability() { + // Test that the same wrapper can be reused for multiple records + String avroSchemaJson = "{" + + "\"type\": \"record\"," + + "\"name\": \"User\"," + + "\"fields\": [" + + " {\"name\": \"id\", \"type\": \"long\"}," + + " {\"name\": \"name\", \"type\": \"string\"}" + + "]}"; + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + + org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()) + ); + + RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema); + + + // First record + GenericRecord record1 = new GenericData.Record(avroSchema); + record1.put("id", 1L); + record1.put("name", new Utf8("Alice")); + + Record bind1 = recordBinder.bind(record1); + assertEquals(1L, bind1.get(0)); + assertEquals("Alice", bind1.get(1)); + + // Reuse wrapper for second record + GenericRecord record2 = new GenericData.Record(avroSchema); + record2.put("id", 2L); + record2.put("name", new Utf8("Bob")); + + Record bind2 = recordBinder.bind(record2); + assertEquals(2L, bind2.get(0)); + assertEquals("Bob", bind2.get(1)); + } + + + // Test method for converting a single string field + @Test + public void testStringConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"stringField\", \"type\": \"string\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("stringField", "test_string"); + + GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); + + // Verify the field value + assertEquals("test_string", icebergRecord.getField("stringField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single integer field + @Test + public void testIntegerConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"intField\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("intField", 42); + + GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); + + // Verify the field value + assertEquals(42, icebergRecord.getField("intField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single long field + @Test + public void testLongConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"longField\", \"type\": \"long\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("longField", 123456789L); + + GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); + + // Verify the field value + assertEquals(123456789L, icebergRecord.getField("longField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single float field + @Test + public void testFloatConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"floatField\", \"type\": \"float\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("floatField", 3.14f); + + GenericRecord record = serializeAndDeserialize(avroRecord, avroSchema); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(record); + + // Verify the field value + assertEquals(3.14f, icebergRecord.getField("floatField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single double field + @Test + public void testDoubleConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"doubleField\", \"type\": \"double\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("doubleField", 6.28); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(6.28, icebergRecord.getField("doubleField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single boolean field + @Test + public void testBooleanConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"booleanField\", \"type\": \"boolean\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("booleanField", true); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(true, icebergRecord.getField("booleanField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single date field (number of days from epoch) + @Test + public void testDateConversion() { + // Define Avro schema + String avroSchemaStr = "{\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [ {\"name\": \"dateField\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}} ] }"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalDate localDate = LocalDate.of(2020, 1, 1); + int epochDays = (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), localDate); + avroRecord.put("dateField", epochDays); // Represents 2020-01-01 + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(localDate, icebergRecord.getField("dateField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single time field (number of milliseconds from midnight) + @Test + public void testTimeConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"timeField\", \"type\": {\"type\": \"long\", \"logicalType\": \"time-micros\"}},\n" + + " {\"name\": \"timeField2\", \"type\": {\"type\": \"int\", \"logicalType\": \"time-millis\"}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalTime localTime = LocalTime.of(10, 0); + long epochMicros = localTime.toNanoOfDay() / 1000; + avroRecord.put("timeField", epochMicros); // Represents 10:00 AM + + int epochMillis = (int) (localTime.toNanoOfDay() / 1_000_000); + avroRecord.put("timeField2", epochMillis); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(localTime, icebergRecord.getField("timeField")); + assertEquals(localTime, icebergRecord.getField("timeField2")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single timestamp field (number of milliseconds from epoch) + // timestamp: Stores microseconds from 1970-01-01 00:00:00.000000. [1] + // timestamptz: Stores microseconds from 1970-01-01 00:00:00.000000 UTC. [1] + @Test + public void testTimestampConversion() { + // Define Avro schema + // Avro type annotation adjust-to-utc is an Iceberg convention; default value is false if not present. + // Avro logical type timestamp-nanos is an Iceberg convention; the Avro specification does not define this type. + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"timestampField1\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\", \"adjust-to-utc\": true}},\n" + + " {\"name\": \"timestampField2\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\", \"adjust-to-utc\": false}},\n" + + " {\"name\": \"timestampField3\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"adjust-to-utc\": true}},\n" + + " {\"name\": \"timestampField4\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\", \"adjust-to-utc\": false}}\n" + + " ]\n" + + " }\n"; + Schema avroSchema = new Schema.Parser().parse(avroSchemaStr); + + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + Instant instant = Instant.now(); + long timestampMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000; + long timestampMillis = instant.toEpochMilli(); + + avroRecord.put("timestampField1", timestampMicros); + avroRecord.put("timestampField2", timestampMicros); + avroRecord.put("timestampField3", timestampMillis); + avroRecord.put("timestampField4", timestampMillis); + + // Serialize and deserialize + GenericRecord deserializedRecord = serializeAndDeserialize(avroRecord, avroSchema); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(deserializedRecord); + + // Verify the field value + OffsetDateTime timestampField1 = (OffsetDateTime) icebergRecord.getField("timestampField1"); + assertEquals(DateTimeUtil.timestamptzFromMicros(timestampMicros), timestampField1); + + LocalDateTime timestampField2 = (LocalDateTime) icebergRecord.getField("timestampField2"); + assertEquals(DateTimeUtil.timestampFromMicros(timestampMicros), timestampField2); + + OffsetDateTime timestampField3 = (OffsetDateTime) icebergRecord.getField("timestampField3"); + assertEquals(DateTimeUtil.timestamptzFromMicros(timestampMillis * 1000), timestampField3); + + LocalDateTime timestampField4 = (LocalDateTime) icebergRecord.getField("timestampField4"); + assertEquals(DateTimeUtil.timestampFromMicros(timestampMillis * 1000), timestampField4); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single binary field + @Test + public void testBinaryConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"binaryField\", \"type\": \"bytes\"}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + String randomAlphabetic = RandomStringUtils.randomAlphabetic(64); + avroRecord.put("binaryField", ByteBuffer.wrap(randomAlphabetic.getBytes(StandardCharsets.UTF_8))); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + ByteBuffer binaryField = (ByteBuffer) icebergRecord.getField("binaryField"); + assertEquals(randomAlphabetic, new String(binaryField.array(), StandardCharsets.UTF_8)); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single fixed field + @Test + public void testFixedConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"fixedField\",\n" + + " \"type\": {\n" + + " \"type\": \"fixed\",\n" + + " \"name\": \"FixedField\",\n" + + " \"size\": 3\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + byte[] fixedBytes = "bar".getBytes(StandardCharsets.UTF_8); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("fixedField", new GenericData.Fixed(avroSchema.getField("fixedField").schema(), fixedBytes)); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + byte[] result = (byte[]) icebergRecord.getField("fixedField"); + assertEquals("bar", new String(result, StandardCharsets.UTF_8)); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single enum field + @Test + public void testEnumConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"enumField\",\n" + + " \"type\": {\n" + + " \"type\": \"enum\",\n" + + " \"name\": \"EnumField\",\n" + + " \"symbols\": [\"A\", \"B\", \"C\"]\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("enumField", new GenericData.EnumSymbol(avroSchema.getField("enumField").schema(), "B")); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals("B", icebergRecord.getField("enumField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single UUID field + @Test + public void testUUIDConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"uuidField\", \"type\": {\"type\": \"string\", \"logicalType\": \"uuid\"}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + UUID uuid = UUID.randomUUID(); + CharSequence charSequence = new Conversions.UUIDConversion().toCharSequence(uuid, avroSchema, LogicalTypes.uuid()); + + avroRecord.put("uuidField", charSequence); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(uuid, UUIDUtil.convert((byte[]) icebergRecord.getField("uuidField"))); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a single decimal field + @Test + public void testDecimalConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"decimalField\", \"type\": {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 9, \"scale\": 2}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + BigDecimal bigDecimal = BigDecimal.valueOf(1000.00).setScale(2); + LogicalTypes.Decimal decimalType = LogicalTypes.decimal(9, 2); + byte[] decimalBytes = new Conversions.DecimalConversion().toBytes(bigDecimal, avroSchema, decimalType).array(); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("decimalField", ByteBuffer.wrap(decimalBytes)); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(bigDecimal, icebergRecord.getField("decimalField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a list field + @Test + public void testListConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"listField\", \"type\": {\"type\": \"array\", \"items\": \"string\"}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("listField", Arrays.asList("a", "b", "c")); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(Arrays.asList("a", "b", "c"), icebergRecord.getField("listField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a map field + @Test + public void testStringMapConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"mapField\", \"type\": {\"type\": \"map\", \"values\": \"string\"}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + Map map = new HashMap<>(); + map.put("key1", "value1"); + map.put("key2", "value2"); + avroRecord.put("mapField", map); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(map, icebergRecord.getField("mapField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a map field + @Test + public void testIntMapConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"mapField\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + Map map = new HashMap<>(); + map.put("key1", 1); + map.put("key2", 2); + avroRecord.put("mapField", map); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + assertEquals(map, icebergRecord.getField("mapField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a map field with non-string keys + // Maps with non-string keys must use an array representation with the map logical type. + // The array representation or Avro’s map type may be used for maps with string keys. + @Test + public void testMapWithNonStringKeysConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"mapField\",\n" + + " \"type\": {\n" + + " \"type\": \"array\",\n" + + " \"logicalType\": \"map\",\n" + + " \"items\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"MapEntry\",\n" + + " \"fields\": [\n" + + " {\"name\": \"key\", \"type\": \"int\"},\n" + + " {\"name\": \"value\", \"type\": \"string\"}\n" + + " ]\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + Map expectedMap = new HashMap<>(); + expectedMap.put(1, "value1"); + expectedMap.put(2, "value2"); + expectedMap.put(3, "value3"); + + GenericRecord avroRecord = new GenericData.Record(avroSchema); + List mapEntries = new ArrayList<>(); + for (Map.Entry entry : expectedMap.entrySet()) { + GenericRecord mapEntry = new GenericData.Record(avroSchema.getField("mapField").schema().getElementType()); + mapEntry.put("key", entry.getKey()); + mapEntry.put("value", entry.getValue()); + mapEntries.add(mapEntry); + } + avroRecord.put("mapField", mapEntries); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Convert the list of records back to a map + @SuppressWarnings("unchecked") + Map mapField = (Map) icebergRecord.getField("mapField"); + // Verify the field value + assertEquals(expectedMap, mapField); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a record with nested fields + @Test + public void testNestedRecordConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"nestedField\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"NestedRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"nestedStringField\", \"type\": \"string\"},\n" + + " {\"name\": \"nestedIntField\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord nestedRecord = new GenericData.Record(avroSchema.getField("nestedField").schema()); + nestedRecord.put("nestedStringField", "nested_string"); + nestedRecord.put("nestedIntField", 42); + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("nestedField", nestedRecord); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field values + Record nestedIcebergRecord = (Record) icebergRecord.getField("nestedField"); + assertEquals("nested_string", nestedIcebergRecord.getField("nestedStringField")); + assertEquals(42, nestedIcebergRecord.getField("nestedIntField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a record with optional fields + // Optional fields must always set the Avro field default value to null. + @Test + public void testOptionalFieldConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"optionalStringField\", \"type\": [\"null\", \"string\"], \"default\": null},\n" + + " {\"name\": \"optionalIntField\", \"type\": [\"null\", \"int\"], \"default\": null},\n" + + " {\"name\": \"optionalStringNullField\", \"type\": [\"null\", \"string\"], \"default\": null},\n" + + " {\"name\": \"optionalIntNullField\", \"type\": [\"null\", \"int\"], \"default\": null}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("optionalStringField", "optional_string"); + avroRecord.put("optionalIntField", 42); + avroRecord.put("optionalStringNullField", null); + avroRecord.put("optionalIntNullField", null); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field values + assertEquals("optional_string", icebergRecord.getField("optionalStringField")); + assertEquals(42, icebergRecord.getField("optionalIntField")); + assertNull(icebergRecord.getField("optionalStringNullField")); + assertNull(icebergRecord.getField("optionalIntNullField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a record with default values + @Test + public void testDefaultFieldConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"defaultStringField\", \"type\": \"string\", \"default\": \"default_string\"},\n" + + " {\"name\": \"defaultIntField\", \"type\": \"int\", \"default\": 42}\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + Schema.Field defaultStringField = avroSchema.getField("defaultStringField"); + Schema.Field defaultIntField = avroSchema.getField("defaultIntField"); + avroRecord.put("defaultStringField", defaultStringField.defaultVal()); + avroRecord.put("defaultIntField", defaultIntField.defaultVal()); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field values + assertEquals("default_string", icebergRecord.getField("defaultStringField")); + assertEquals(42, icebergRecord.getField("defaultIntField")); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } + + // Test method for converting a record with union fields + // Optional fields, array elements, and map values must be wrapped in an Avro union with null. + // This is the only union type allowed in Iceberg data files. + @Test + public void testUnionFieldConversion() { + // Define Avro schema + String avroSchemaStr = " {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"unionField1\",\n" + + " \"type\": [\"null\", \"string\"]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionField2\",\n" + + " \"type\": [\"null\", \"int\"]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionField3\",\n" + + " \"type\": [\"null\", \"boolean\"]\n" + + " },\n" + + " {\n" + + " \"name\": \"unionField4\",\n" + + " \"type\": [\"null\", \"string\"]\n" + + " }\n" + + " ]\n" + + " }\n"; + avroSchema = new Schema.Parser().parse(avroSchemaStr); + // Create Avro record + GenericRecord avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("unionField1", "union_string"); + avroRecord.put("unionField2", 42); + avroRecord.put("unionField3", true); + + // Convert Avro record to Iceberg record using the wrapper + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + Record icebergRecord = new RecordBinder(icebergSchema, avroSchema).bind(serializeAndDeserialize(avroRecord, avroSchema)); + + // Verify the field value + Object unionField1 = icebergRecord.getField("unionField1"); + assertEquals("union_string", unionField1); + + Object unionField2 = icebergRecord.getField("unionField2"); + assertEquals(42, unionField2); + + Object unionField3 = icebergRecord.getField("unionField3"); + assertEquals(true, unionField3); + + // Send the record to the table + testSendRecord(icebergSchema, icebergRecord); + } +} diff --git a/core/src/test/java/kafka/automq/table/binder/PerformanceComparisonTest.java b/core/src/test/java/kafka/automq/table/binder/PerformanceComparisonTest.java new file mode 100644 index 0000000000..a4797d588e --- /dev/null +++ b/core/src/test/java/kafka/automq/table/binder/PerformanceComparisonTest.java @@ -0,0 +1,218 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import kafka.automq.table.worker.convert.AvroToIcebergVisitor; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.Record; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Performance comparison test between the AvroRecordWrapper + * and the traditional AvroToIcebergVisitor, focusing on CPU-bound operations. + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class PerformanceComparisonTest { + + private static Schema avroSchema; + private static org.apache.iceberg.Schema icebergSchema; + private static List testRecords; + + @BeforeAll + static void setUp() { + String avroSchemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"long\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"active\", \"type\": \"boolean\"},\n" + + " {\"name\": \"score\", \"type\": \"double\"},\n" + + " {\"name\": \"birthDate\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},\n" + + " {\"name\": \"tags\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" + + " {\"name\": \"metadata\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},\n" + + " {\n" + + " \"name\": \"address\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Address\",\n" + + " \"fields\": [\n" + + " {\"name\": \"street\", \"type\": \"string\"},\n" + + " {\"name\": \"city\", \"type\": \"string\"},\n" + + " {\"name\": \"zipCode\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + + avroSchema = new Schema.Parser().parse(avroSchemaStr); + icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + testRecords = createTestRecords(1000000); // 1,000,000 records for performance testing + } + + @Test + @Order(1) + public void testPerformanceAndReusability() { + System.out.println("=== Performance Comparison: RecordBinder vs Visitor ==="); + int iterations = 10; + int recordCount = testRecords.size() * iterations; + System.out.printf("iterations: %d; each recordCount %d \n", iterations, recordCount); + + // --- 1. Visitor (Traditional Approach) --- + AvroToIcebergVisitor visitor = new AvroToIcebergVisitor(icebergSchema); + warmup(visitor); + long visitorTime = measurePerformance(() -> { + for (GenericRecord record : testRecords) { + accessAllFields(visitor.convertRecord(record)); + } + }, iterations); + printResults("Visitor", recordCount, visitorTime); + + // --- 2. RecordBinder --- + RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema); + warmup(recordBinder); + long reusableWrapperTime = measurePerformance(() -> { + for (GenericRecord record : testRecords) { + accessAllFields(recordBinder.bind(record)); + } + }, iterations); + printResults("RecordBinder", recordCount, reusableWrapperTime); + + + System.out.println("\nPerformance Improvement (RecordBinder vs. Visitor): " + String.format("%.2fx", (double) visitorTime / reusableWrapperTime)); + } + + // --- Helper Methods --- + + private static List createTestRecords(int count) { + List records = new ArrayList<>(); + for (int i = 0; i < count; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("id", (long) i); + record.put("name", "User" + i); + record.put("active", i % 2 == 0); + record.put("score", i * 1.5); + LocalDate birthDate = LocalDate.of(1990 + (i % 30), 1 + (i % 12), 1 + (i % 28)); + record.put("birthDate", (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), birthDate)); + record.put("tags", Arrays.asList("tag" + i, "category" + (i % 10))); + Map metadata = new HashMap<>(); + metadata.put("source", "test"); + metadata.put("batch", String.valueOf(i / 100)); + record.put("metadata", metadata); + GenericRecord address = new GenericData.Record(avroSchema.getField("address").schema()); + address.put("street", i + " Main St"); + address.put("city", "City" + (i % 50)); + address.put("zipCode", 100000 + i); + record.put("address", address); + records.add(serializeAndDeserialize(record, avroSchema)); + } + return records; + } + + private static GenericRecord serializeAndDeserialize(GenericRecord record, Schema schema) { + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DatumWriter datumWriter = new SpecificDatumWriter<>(schema); + Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + datumWriter.write(record, encoder); + encoder.flush(); + outputStream.close(); + byte[] serializedBytes = outputStream.toByteArray(); + DatumReader datumReader = new SpecificDatumReader<>(schema); + ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedBytes); + Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void warmup(RecordBinder recordBinder) { + for (int i = 0; i < 10000; i++) { + accessAllFields(recordBinder.bind(testRecords.get(i % testRecords.size()))); + } + } + + private void warmup(AvroToIcebergVisitor visitor) { + for (int i = 0; i < 10000; i++) { + accessAllFields(visitor.convertRecord(testRecords.get(i % testRecords.size()))); + } + } + + private long measurePerformance(Runnable task, int iterations) { + long startTime = System.nanoTime(); + for (int i = 0; i < iterations; i++) { + task.run(); + } + return System.nanoTime() - startTime; + } + + private void printResults(String testName, int recordCount, long totalTimeNanos) { + long totalFieldAccesses = recordCount * 10L; // 10 fields accessed per record + System.out.printf("%-25s | Total Time: %5d ms | Avg per Record: %5d ns | Avg per Field: %4d ns\n", + testName, + totalTimeNanos / 1_000_000, + totalTimeNanos / recordCount, + totalTimeNanos / totalFieldAccesses); + } + + private void accessAllFields(Record record) { + record.getField("id"); + record.getField("name"); + record.getField("active"); + record.getField("score"); + record.getField("birthDate"); + record.getField("tags"); + record.getField("metadata"); + Record address = (Record) record.getField("address"); + if (address != null) { + address.getField("street"); + address.getField("city"); + address.getField("zipCode"); + } + } +} diff --git a/core/src/test/java/kafka/automq/table/binder/PerformanceProfilingTest.java b/core/src/test/java/kafka/automq/table/binder/PerformanceProfilingTest.java new file mode 100644 index 0000000000..d9eab0689f --- /dev/null +++ b/core/src/test/java/kafka/automq/table/binder/PerformanceProfilingTest.java @@ -0,0 +1,378 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.automq.table.binder; + +import kafka.automq.table.worker.convert.AvroToIcebergVisitor; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.Record; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Detailed performance profiling test to identify the root cause of performance issues. + * Each test runs for approximately 5 minutes with a large number of records. + */ +public class PerformanceProfilingTest { + + private static final int TEST_DURATION_MINUTES = 1; + private static final long TEST_DURATION_NANOS = TEST_DURATION_MINUTES * 60L * 1_000_000_000L; + + private Schema avroSchema; + private org.apache.iceberg.Schema icebergSchema; + private List testRecords; + + @BeforeEach + void setUp() { + String avroSchemaStr = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"long\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"active\", \"type\": \"boolean\"},\n" + + " {\"name\": \"score\", \"type\": \"double\"},\n" + + " {\"name\": \"birthDate\", \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},\n" + + " {\"name\": \"tags\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" + + " {\"name\": \"metadata\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},\n" + + " {\n" + + " \"name\": \"address\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Address\",\n" + + " \"fields\": [\n" + + " {\"name\": \"street\", \"type\": \"string\"},\n" + + " {\"name\": \"city\", \"type\": \"string\"},\n" + + " {\"name\": \"zipCode\", \"type\": \"int\"}\n" + + " ]\n" + + " }\n" + + " },\n" + + " {\"name\": \"salary\", \"type\": [\"null\", \"double\"], \"default\": null},\n" + + " {\"name\": \"department\", \"type\": [\"null\", \"string\"], \"default\": null}\n" + + " ]\n" + + "}"; + + avroSchema = new Schema.Parser().parse(avroSchemaStr); + icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + + // Create a large dataset for intensive testing (1 million records) + testRecords = createTestRecords(1_000_000); + + System.out.println("=== Performance Profiling Test Setup ==="); + System.out.println("Test Records: " + testRecords.size()); + System.out.println("Test Duration: " + TEST_DURATION_MINUTES + " minutes"); + System.out.println("Schema Fields: " + avroSchema.getFields().size()); + System.out.println("========================================="); + } + + @Test + public void testRecordBinderLongRunning() { + System.out.println("\n=== Starting RecordBinder Long-Running Test ==="); + + RecordBinder recordBinder = new RecordBinder(icebergSchema, avroSchema); + + // Warmup + System.out.println("Warming up RecordBinder..."); + performWarmup(recordBinder, null); + + System.out.println("Starting 5-minute RecordBinder performance test..."); + long startTime = System.nanoTime(); + long recordsProcessed = 0; + long fieldAccessesProcessed = 0; + long totalConversionTime = 0; + long totalAccessTime = 0; + + while ((System.nanoTime() - startTime) < TEST_DURATION_NANOS) { + GenericRecord record = getRandomRecord(); + + // Measure conversion time + long conversionStart = System.nanoTime(); + Record icebergRecord = recordBinder.bind(record); + long conversionEnd = System.nanoTime(); + totalConversionTime += conversionEnd - conversionStart; + + // Measure field access time + long accessStart = System.nanoTime(); + int fieldsAccessed = accessAllFields(icebergRecord); + long accessEnd = System.nanoTime(); + totalAccessTime += accessEnd - accessStart; + + recordsProcessed++; + fieldAccessesProcessed += fieldsAccessed; + + // Print progress every 100,000 records + if (recordsProcessed % 100_000 == 0) { + long elapsedSeconds = (System.nanoTime() - startTime) / 1_000_000_000L; + System.out.printf("RecordBinder Progress: %d records, %d seconds elapsed\n", + recordsProcessed, elapsedSeconds); + } + } + + long totalTime = System.nanoTime() - startTime; + printDetailedResults("RecordBinder", recordsProcessed, fieldAccessesProcessed, + totalTime, totalConversionTime, totalAccessTime); + } + + @Test + public void testVisitorLongRunning() { + System.out.println("\n=== Starting Visitor Long-Running Test ==="); + + AvroToIcebergVisitor visitor = new AvroToIcebergVisitor(icebergSchema); + + // Warmup + System.out.println("Warming up Visitor..."); + performWarmup(null, visitor); + + System.out.println("Starting 5-minute Visitor performance test..."); + long startTime = System.nanoTime(); + long recordsProcessed = 0; + long fieldAccessesProcessed = 0; + long totalConversionTime = 0; + long totalAccessTime = 0; + + while ((System.nanoTime() - startTime) < TEST_DURATION_NANOS) { + GenericRecord record = getRandomRecord(); + + // Measure conversion time + long conversionStart = System.nanoTime(); + Record icebergRecord = visitor.convertRecord(record); + long conversionEnd = System.nanoTime(); + totalConversionTime += conversionEnd - conversionStart; + + // Measure field access time + long accessStart = System.nanoTime(); + int fieldsAccessed = accessAllFields(icebergRecord); + long accessEnd = System.nanoTime(); + totalAccessTime += accessEnd - accessStart; + + recordsProcessed++; + fieldAccessesProcessed += fieldsAccessed; + + // Print progress every 100,000 records + if (recordsProcessed % 100_000 == 0) { + long elapsedSeconds = (System.nanoTime() - startTime) / 1_000_000_000L; + System.out.printf("Visitor Progress: %d records, %d seconds elapsed\n", + recordsProcessed, elapsedSeconds); + } + } + + long totalTime = System.nanoTime() - startTime; + printDetailedResults("Visitor", recordsProcessed, fieldAccessesProcessed, + totalTime, totalConversionTime, totalAccessTime); + } + + // --- Helper Methods --- + + private List createTestRecords(int count) { + System.out.println("Creating " + count + " test records..."); + List records = new ArrayList<>(); + + for (int i = 0; i < count; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("id", (long) i); + record.put("name", "User" + i); + record.put("active", i % 2 == 0); + record.put("score", ThreadLocalRandom.current().nextDouble(50.0, 100.0)); + + LocalDate birthDate = LocalDate.of(1970 + (i % 50), 1 + (i % 12), 1 + (i % 28)); + record.put("birthDate", (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), birthDate)); + + record.put("tags", Arrays.asList("tag" + (i % 100), "category" + (i % 20), "type" + (i % 10))); + + Map metadata = new HashMap<>(); + metadata.put("source", "test-batch-" + (i / 1000)); + metadata.put("batch", String.valueOf(i / 10000)); + metadata.put("region", "region-" + (i % 5)); + record.put("metadata", metadata); + + GenericRecord address = new GenericData.Record(avroSchema.getField("address").schema()); + address.put("street", (i % 9999 + 1) + " Main St"); + address.put("city", "City" + (i % 100)); + address.put("zipCode", 10000 + (i % 90000)); + record.put("address", address); + + // Optional fields + record.put("salary", i % 3 == 0 ? null : ThreadLocalRandom.current().nextDouble(30000, 150000)); + record.put("department", i % 5 == 0 ? null : "Dept" + (i % 20)); + + records.add(serializeAndDeserialize(record, avroSchema)); + + if (i > 0 && i % 100_000 == 0) { + System.out.println("Created " + i + " records..."); + } + } + + System.out.println("Finished creating " + count + " test records."); + return records; + } + + private GenericRecord serializeAndDeserialize(GenericRecord record, Schema schema) { + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DatumWriter datumWriter = new SpecificDatumWriter<>(schema); + Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + datumWriter.write(record, encoder); + encoder.flush(); + outputStream.close(); + byte[] serializedBytes = outputStream.toByteArray(); + + DatumReader datumReader = new SpecificDatumReader<>(schema); + ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedBytes); + Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + return datumReader.read(null, decoder); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize/deserialize record", e); + } + } + + private GenericRecord getRandomRecord() { + int index = ThreadLocalRandom.current().nextInt(testRecords.size()); + return testRecords.get(index); + } + + private void performWarmup(RecordBinder recordBinder, AvroToIcebergVisitor visitor) { + int warmupIterations = 50_000; + System.out.println("Performing " + warmupIterations + " warmup iterations..."); + + for (int i = 0; i < warmupIterations; i++) { + GenericRecord record = getRandomRecord(); + + if (recordBinder != null) { + Record icebergRecord = recordBinder.bind(record); + accessAllFields(icebergRecord); + } + + if (visitor != null) { + Record icebergRecord = visitor.convertRecord(record); + accessAllFields(icebergRecord); + } + } + System.out.println("Warmup completed."); + } + + private int accessAllFields(Record record) { + int fieldsAccessed = 0; + + // Access primitive fields + record.getField("id"); + fieldsAccessed++; + + record.getField("name"); + fieldsAccessed++; + + record.getField("active"); + fieldsAccessed++; + + record.getField("score"); + fieldsAccessed++; + + record.getField("birthDate"); + fieldsAccessed++; + + // Access collection fields + record.getField("tags"); + fieldsAccessed++; + + record.getField("metadata"); + fieldsAccessed++; + + // Access nested record + Record address = (Record) record.getField("address"); + fieldsAccessed++; + if (address != null) { + address.getField("street"); + fieldsAccessed++; + address.getField("city"); + fieldsAccessed++; + address.getField("zipCode"); + fieldsAccessed++; + } + + // Access optional fields + record.getField("salary"); + fieldsAccessed++; + + record.getField("department"); + fieldsAccessed++; + + return fieldsAccessed; + } + + private void printDetailedResults(String testName, long recordsProcessed, long fieldAccessesProcessed, + long totalTimeNanos, long conversionTimeNanos, long accessTimeNanos) { + + long totalTimeMs = totalTimeNanos / 1_000_000; + long conversionTimeMs = conversionTimeNanos / 1_000_000; + long accessTimeMs = accessTimeNanos / 1_000_000; + + double recordsPerSecond = (double) recordsProcessed / (totalTimeNanos / 1_000_000_000.0); + double fieldAccessesPerSecond = (double) fieldAccessesProcessed / (totalTimeNanos / 1_000_000_000.0); + + long avgConversionTimeNs = conversionTimeNanos / recordsProcessed; + long avgAccessTimeNs = accessTimeNanos / recordsProcessed; + long avgFieldAccessTimeNs = accessTimeNanos / fieldAccessesProcessed; + + System.out.println("\n" + "=".repeat(60)); + System.out.println(testName + " Performance Results"); + System.out.println("=".repeat(60)); + System.out.printf("Total Runtime: %,d ms (%.1f minutes)\n", totalTimeMs, totalTimeMs / 60000.0); + System.out.printf("Records Processed: %,d\n", recordsProcessed); + System.out.printf("Field Accesses: %,d\n", fieldAccessesProcessed); + System.out.println(); + System.out.printf("Records/Second: %,.1f\n", recordsPerSecond); + System.out.printf("Field Accesses/Second: %,.1f\n", fieldAccessesPerSecond); + System.out.println(); + System.out.printf("Avg Conversion Time: %,d ns/record\n", avgConversionTimeNs); + System.out.printf("Avg Access Time: %,d ns/record\n", avgAccessTimeNs); + System.out.printf("Avg Field Access Time: %,d ns/field\n", avgFieldAccessTimeNs); + System.out.println(); + System.out.printf("Time Breakdown:\n"); + System.out.printf(" Conversion: %,d ms (%.1f%%)\n", conversionTimeMs, + 100.0 * conversionTimeNanos / totalTimeNanos); + System.out.printf(" Access: %,d ms (%.1f%%)\n", accessTimeMs, + 100.0 * accessTimeNanos / totalTimeNanos); + System.out.printf(" Overhead: %,d ms (%.1f%%)\n", + totalTimeMs - conversionTimeMs - accessTimeMs, + 100.0 * (totalTimeNanos - conversionTimeNanos - accessTimeNanos) / totalTimeNanos); + System.out.println("=".repeat(60)); + } +}