diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java index c8af8d03333e..62b1c3c6ee3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java @@ -21,6 +21,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.UUID; import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.values.Row; @@ -40,4 +41,7 @@ private SqlTypes() {} /** Beam LogicalType corresponding to TIMESTAMP type. */ public static final LogicalType TIMESTAMP = new MicrosInstant(); + + /** Beam LogicalType corresponding to UUID type. */ + public static final LogicalType UUID = new UuidLogicalType(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 0c2bc71c6f8b..4b448a2e08ca 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -34,6 +34,8 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; @@ -71,6 +73,7 @@ private IcebergUtils() {} .put(SqlTypes.DATE.getIdentifier(), Types.DateType.get()) .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) + .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -175,8 +178,17 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType( return new TypeAndMaxId( --nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName())); } else if (beamType.getTypeName().isLogicalType()) { - String logicalTypeIdentifier = - checkArgumentNotNull(beamType.getLogicalType()).getIdentifier(); + Schema.LogicalType logicalType = checkArgumentNotNull(beamType.getLogicalType()); + if (logicalType instanceof FixedPrecisionNumeric) { + Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument()); + Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision")); + Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale")); + return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale)); + } + if (logicalType instanceof PassThroughLogicalType) { + return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), nestedFieldId); + } + String logicalTypeIdentifier = logicalType.getIdentifier(); @Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier); if (type == null) { throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 115a6790919e..c9026522dba3 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -35,7 +35,12 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -937,5 +942,103 @@ public void testStructIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); } + + static final Schema BEAM_SCHEMA_JDBC_ALL_TYPES = + Schema.builder() + .addField("array_field", Schema.FieldType.array(Schema.FieldType.STRING)) // from ARRAY + .addField("bigint_field", Schema.FieldType.INT64) // from BIGINT + .addField( + "binary_field", + Schema.FieldType.logicalType(VariableBytes.of("BINARY", 10))) // from BINARY + .addField("bit_field", Schema.FieldType.BOOLEAN) // from BIT + .addField("boolean_field", Schema.FieldType.BOOLEAN) // from BOOLEAN + .addField( + "char_field", Schema.FieldType.logicalType(FixedString.of("CHAR", 10))) // from CHAR + .addField("date_field", Schema.FieldType.logicalType(SqlTypes.DATE)) // from DATE + .addField("decimal_field", Schema.FieldType.DECIMAL) // from DECIMAL + .addField("double_field", Schema.FieldType.DOUBLE) // from DOUBLE + .addField("float_field", Schema.FieldType.DOUBLE) // from FLOAT + .addField("integer_field", Schema.FieldType.INT32) // from INTEGER + .addField( + "longnvarchar_field", + Schema.FieldType.logicalType( + VariableString.of("LONGNVARCHAR", 100))) // from LONGNVARCHAR + .addField( + "longvarbinary_field", + Schema.FieldType.logicalType( + VariableBytes.of("LONGVARBINARY", 100))) // from LONGVARBINARY + .addField( + "longvarchar_field", + Schema.FieldType.logicalType( + VariableString.of("LONGVARCHAR", 100))) // from LONGVARCHAR + .addField( + "nchar_field", + Schema.FieldType.logicalType(FixedString.of("NCHAR", 10))) // from NCHAR + .addField( + "numeric_field", + Schema.FieldType.logicalType(FixedPrecisionNumeric.of(10, 5))) // from NUMERIC + .addField( + "nvarchar_field", + Schema.FieldType.logicalType(VariableString.of("NVARCHAR", 100))) // from NVARCHAR + .addField("real_field", Schema.FieldType.FLOAT) // from REAL + .addField("smallint_field", Schema.FieldType.INT16) // from SMALLINT + .addField("time_field", Schema.FieldType.logicalType(SqlTypes.TIME)) // from TIME + .addField( + "timestamp_field", + Schema.FieldType.logicalType(SqlTypes.DATETIME)) // from TIMESTAMP + .addField( + "timestamp_with_timezone_field", + Schema.FieldType.DATETIME) // from TIMESTAMP_WITH_TIMEZONE + .addField("tinyint_field", Schema.FieldType.BYTE) // from TINYINT + .addField( + "varbinary_field", + Schema.FieldType.logicalType(VariableBytes.of("VARBINARY", 100))) // from VARBINARY + .addField( + "varchar_field", + Schema.FieldType.logicalType(VariableString.of("VARCHAR", 100))) // from VARCHAR + .addField("blob_field", Schema.FieldType.BYTES) // from BLOB + .addField("clob_field", Schema.FieldType.STRING) // from CLOB + .addField( + "uuid_field", Schema.FieldType.logicalType(new UuidLogicalType())) // from UUID + .build(); + + static final org.apache.iceberg.Schema ICEBERG_SCHEMA_JDBC_ALL_TYPES = + new org.apache.iceberg.Schema( + required(1, "array_field", Types.ListType.ofRequired(29, Types.StringType.get())), + required(2, "bigint_field", Types.LongType.get()), + required(3, "binary_field", Types.BinaryType.get()), + required(4, "bit_field", Types.BooleanType.get()), + required(5, "boolean_field", Types.BooleanType.get()), + required(6, "char_field", Types.StringType.get()), + required(7, "date_field", Types.DateType.get()), + required(8, "decimal_field", Types.StringType.get()), + required(9, "double_field", Types.DoubleType.get()), + required(10, "float_field", Types.DoubleType.get()), + required(11, "integer_field", Types.IntegerType.get()), + required(12, "longnvarchar_field", Types.StringType.get()), + required(13, "longvarbinary_field", Types.BinaryType.get()), + required(14, "longvarchar_field", Types.StringType.get()), + required(15, "nchar_field", Types.StringType.get()), + required(16, "numeric_field", Types.DecimalType.of(10, 5)), + required(17, "nvarchar_field", Types.StringType.get()), + required(18, "real_field", Types.FloatType.get()), + required(19, "smallint_field", Types.StringType.get()), + required(20, "time_field", Types.TimeType.get()), + required(21, "timestamp_field", Types.TimestampType.withoutZone()), + required(22, "timestamp_with_timezone_field", Types.TimestampType.withZone()), + required(23, "tinyint_field", Types.StringType.get()), + required(24, "varbinary_field", Types.BinaryType.get()), + required(25, "varchar_field", Types.StringType.get()), + required(26, "blob_field", Types.BinaryType.get()), + required(27, "clob_field", Types.StringType.get()), + required(28, "uuid_field", Types.UUIDType.get())); + + @Test + public void testJdbcBeamSchemaToIcebergSchema() { + org.apache.iceberg.Schema convertedIcebergSchema = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_JDBC_ALL_TYPES); + + assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_JDBC_ALL_TYPES)); + } } }