From e4d441682f517f388b5e0f12b4431c59e4f62852 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 19 Nov 2025 15:43:21 -0800 Subject: [PATCH 1/4] Add support for more schema types in Iceberg IO --- .../beam/sdk/io/iceberg/IcebergUtils.java | 16 ++- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 114 +++++++++++++++++- 2 files changed, 126 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 0c2bc71c6f8b..0ee0f2c80217 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -34,7 +34,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.*; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -71,6 +71,10 @@ private IcebergUtils() {} .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) + .put(VariableString.IDENTIFIER, Types.StringType.get()) + .put(VariableBytes.IDENTIFIER, Types.BinaryType.get()) + .put(FixedString.IDENTIFIER, Types.StringType.get()) + .put(UuidLogicalType.IDENTIFIER, Types.UUIDType.get()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -175,8 +179,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( return new TypeAndMaxId( --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); } else if (beamType.getTypeName().isLogicalType()) { - String logicalTypeIdentifier = - checkArgumentNotNull(beamType.getLogicalType()).getIdentifier(); + Schema.LogicalType logicalType = checkArgumentNotNull(beamType.getLogicalType()); + String logicalTypeIdentifier = logicalType.getIdentifier(); + if (logicalType instanceof FixedPrecisionNumeric) { + Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument()); + Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision")); + Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale")); + return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale)); + } @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); if (type == null) { throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 115a6790919e..c464f2e81000 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -35,7 +35,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.*; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -614,6 +614,20 @@ public void testPrimitiveBeamFieldTypeToIcebergFieldType() { checkTypes(primitives); } + @Test + public void testLogicalBeamFieldTypeToIcebergFieldType() { + // primitive types don't use the nested field ID + List primitives = + Arrays.asList( + new BeamFieldTypeTestCase( + 12, + Schema.FieldType.logicalType(VariableString.of("VARCHAR", 25)), + 11, + Types.StringType.get())); + + checkTypes(primitives); + } + @Test public void testArrayBeamFieldTypeToIcebergFieldType() { // Iceberg's ListType reserves one nested ID for its element type @@ -937,5 +951,103 @@ public void testStructIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); } + + static final Schema BEAM_SCHEMA_JDBC_ALL_TYPES = + Schema.builder() + .addField("array_field", Schema.FieldType.array(Schema.FieldType.STRING)) // from ARRAY + .addField("bigint_field", Schema.FieldType.INT64) // from BIGINT + .addField( + "binary_field", + Schema.FieldType.logicalType(VariableBytes.of("BINARY", 10))) // from BINARY + .addField("bit_field", Schema.FieldType.BOOLEAN) // from BIT + .addField("boolean_field", Schema.FieldType.BOOLEAN) // from BOOLEAN + .addField( + "char_field", Schema.FieldType.logicalType(FixedString.of("CHAR", 10))) // from CHAR + .addField("date_field", Schema.FieldType.logicalType(SqlTypes.DATE)) // from DATE + .addField("decimal_field", Schema.FieldType.DECIMAL) // from DECIMAL + .addField("double_field", Schema.FieldType.DOUBLE) // from DOUBLE + .addField("float_field", Schema.FieldType.DOUBLE) // from FLOAT + .addField("integer_field", Schema.FieldType.INT32) // from INTEGER + .addField( + "longnvarchar_field", + Schema.FieldType.logicalType( + VariableString.of("LONGNVARCHAR", 100))) // from LONGNVARCHAR + .addField( + "longvarbinary_field", + Schema.FieldType.logicalType( + VariableBytes.of("LONGVARBINARY", 100))) // from LONGVARBINARY + .addField( + "longvarchar_field", + Schema.FieldType.logicalType( + VariableString.of("LONGVARCHAR", 100))) // from LONGVARCHAR + .addField( + "nchar_field", + Schema.FieldType.logicalType(FixedString.of("NCHAR", 10))) // from NCHAR + .addField( + "numeric_field", + Schema.FieldType.logicalType(FixedPrecisionNumeric.of(10, 5))) // from NUMERIC + .addField( + "nvarchar_field", + Schema.FieldType.logicalType(VariableString.of("NVARCHAR", 100))) // from NVARCHAR + .addField("real_field", Schema.FieldType.FLOAT) // from REAL + .addField("smallint_field", Schema.FieldType.INT16) // from SMALLINT + .addField("time_field", Schema.FieldType.logicalType(SqlTypes.TIME)) // from TIME + .addField( + "timestamp_field", + Schema.FieldType.logicalType(SqlTypes.DATETIME)) // from TIMESTAMP + .addField( + "timestamp_with_timezone_field", + Schema.FieldType.DATETIME) // from TIMESTAMP_WITH_TIMEZONE + .addField("tinyint_field", Schema.FieldType.BYTE) // from TINYINT + .addField( + "varbinary_field", + Schema.FieldType.logicalType(VariableBytes.of("VARBINARY", 100))) // from VARBINARY + .addField( + "varchar_field", + Schema.FieldType.logicalType(VariableString.of("VARCHAR", 100))) // from VARCHAR + .addField("blob_field", Schema.FieldType.BYTES) // from BLOB + .addField("clob_field", Schema.FieldType.STRING) // from CLOB + .addField( + "uuid_field", Schema.FieldType.logicalType(new UuidLogicalType())) // from UUID + .build(); + + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_JDBC_ALL_TYPES = + new org.apache.iceberg.Schema( + required(1, "array_field", Types.ListType.ofRequired(29, Types.StringType.get())), + required(2, "bigint_field", Types.LongType.get()), + required(3, "binary_field", Types.BinaryType.get()), + required(4, "bit_field", Types.BooleanType.get()), + required(5, "boolean_field", Types.BooleanType.get()), + required(6, "char_field", Types.StringType.get()), + required(7, "date_field", Types.DateType.get()), + required(8, "decimal_field", Types.StringType.get()), + required(9, "double_field", Types.DoubleType.get()), + required(10, "float_field", Types.DoubleType.get()), + required(11, "integer_field", Types.IntegerType.get()), + required(12, "longnvarchar_field", Types.StringType.get()), + required(13, "longvarbinary_field", Types.BinaryType.get()), + required(14, "longvarchar_field", Types.StringType.get()), + required(15, "nchar_field", Types.StringType.get()), + required(16, "numeric_field", Types.DecimalType.of(10, 5)), + required(17, "nvarchar_field", Types.StringType.get()), + required(18, "real_field", Types.FloatType.get()), + required(19, "smallint_field", Types.StringType.get()), + required(20, "time_field", Types.TimeType.get()), + required(21, "timestamp_field", Types.TimestampType.withoutZone()), + required(22, "timestamp_with_timezone_field", Types.TimestampType.withZone()), + required(23, "tinyint_field", Types.StringType.get()), + required(24, "varbinary_field", Types.BinaryType.get()), + required(25, "varchar_field", Types.StringType.get()), + required(26, "blob_field", Types.BinaryType.get()), + required(27, "clob_field", Types.StringType.get()), + required(28, "uuid_field", Types.UUIDType.get())); + + @Test + public void testJdbcBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_JDBC_ALL_TYPES); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_JDBC_ALL_TYPES)); + } } } From 7d733b4da2663f834b46e67b22a32694a0cf314a Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 20 Nov 2025 09:38:30 -0800 Subject: [PATCH 2/4] clean up test --- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index c464f2e81000..fd56a1403fbf 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -614,20 +614,6 @@ public void testPrimitiveBeamFieldTypeToIcebergFieldType() { checkTypes(primitives); } - @Test - public void testLogicalBeamFieldTypeToIcebergFieldType() { - // primitive types don't use the nested field ID - List primitives = - Arrays.asList( - new BeamFieldTypeTestCase( - 12, - Schema.FieldType.logicalType(VariableString.of("VARCHAR", 25)), - 11, - Types.StringType.get())); - - checkTypes(primitives); - } - @Test public void testArrayBeamFieldTypeToIcebergFieldType() { // Iceberg's ListType reserves one nested ID for its element type From 07d6890ca42846dd8bd8caa6e33bd9d505812086 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 20 Nov 2025 10:48:09 -0800 Subject: [PATCH 3/4] Add SQL type --- .../apache/beam/sdk/schemas/logicaltypes/SqlTypes.java | 4 ++++ .../org/apache/beam/sdk/io/iceberg/IcebergUtils.java | 10 +++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java index c8af8d03333e..62b1c3c6ee3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java @@ -21,6 +21,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.values.Row; @@ -40,4 +41,7 @@ private SqlTypes() {} /** Beam LogicalType corresponding to TIMESTAMP type. */ public static final LogicalType TIMESTAMP = new MicrosInstant(); + + /** Beam LogicalType corresponding to UUID type. */ + public static final LogicalType UUID = new UuidLogicalType(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 0ee0f2c80217..e7bfc3902403 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -71,10 +71,7 @@ private IcebergUtils() {} .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) - .put(VariableString.IDENTIFIER, Types.StringType.get()) - .put(VariableBytes.IDENTIFIER, Types.BinaryType.get()) - .put(FixedString.IDENTIFIER, Types.StringType.get()) - .put(UuidLogicalType.IDENTIFIER, Types.UUIDType.get()) + .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -180,13 +177,16 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); } else if (beamType.getTypeName().isLogicalType()) { Schema.LogicalType logicalType = checkArgumentNotNull(beamType.getLogicalType()); - String logicalTypeIdentifier = logicalType.getIdentifier(); if (logicalType instanceof FixedPrecisionNumeric) { Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument()); Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision")); Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale")); return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale)); } + if (logicalType instanceof PassThroughLogicalType) { + return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), nestedFieldId); + } + String logicalTypeIdentifier = logicalType.getIdentifier(); @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); if (type == null) { throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); From cd32eb7f9dbe5147cdaf2b4e9fdad7804e39a0fd Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 20 Nov 2025 11:26:57 -0800 Subject: [PATCH 4/4] Apply spotless --- .../java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java | 4 +++- .../org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index e7bfc3902403..4b448a2e08ca 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -34,7 +34,9 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.*; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index fd56a1403fbf..c9026522dba3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -35,7 +35,12 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.*; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.FixedString; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;