Skip to content

Commit 7f49978

Browse files
authored
Add Iceberg Schema Support for PassThroughLogicalType (#36870)
* Add support for more schema types in Iceberg IO * clean up test * Add SQL type * Apply spotless
1 parent 4970cba commit 7f49978

File tree

3 files changed

+121
-2
lines changed

3 files changed

+121
-2
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.LocalDate;
2222
import java.time.LocalDateTime;
2323
import java.time.LocalTime;
24+
import java.util.UUID;
2425
import org.apache.beam.sdk.schemas.Schema.LogicalType;
2526
import org.apache.beam.sdk.values.Row;
2627

@@ -40,4 +41,7 @@ private SqlTypes() {}
4041

4142
/** Beam LogicalType corresponding to TIMESTAMP type. */
4243
public static final LogicalType<Instant, Row> TIMESTAMP = new MicrosInstant();
44+
45+
/** Beam LogicalType corresponding to UUID type. */
46+
public static final LogicalType<UUID, Row> UUID = new UuidLogicalType();
4347
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.UUID;
3535
import java.util.stream.Collectors;
3636
import org.apache.beam.sdk.schemas.Schema;
37+
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
38+
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
3739
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
3840
import org.apache.beam.sdk.util.Preconditions;
3941
import org.apache.beam.sdk.values.Row;
@@ -71,6 +73,7 @@ private IcebergUtils() {}
7173
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
7274
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
7375
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
76+
.put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get())
7477
.build();
7578

7679
private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
@@ -175,8 +178,17 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
175178
return new TypeAndMaxId(
176179
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
177180
} else if (beamType.getTypeName().isLogicalType()) {
178-
String logicalTypeIdentifier =
179-
checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
181+
Schema.LogicalType<?, ?> logicalType = checkArgumentNotNull(beamType.getLogicalType());
182+
if (logicalType instanceof FixedPrecisionNumeric) {
183+
Row args = Preconditions.checkArgumentNotNull(logicalType.getArgument());
184+
Integer precision = Preconditions.checkArgumentNotNull(args.getInt32("precision"));
185+
Integer scale = Preconditions.checkArgumentNotNull(args.getInt32("scale"));
186+
return new TypeAndMaxId(--nestedFieldId, Types.DecimalType.of(precision, scale));
187+
}
188+
if (logicalType instanceof PassThroughLogicalType) {
189+
return beamFieldTypeToIcebergFieldType(logicalType.getBaseType(), nestedFieldId);
190+
}
191+
String logicalTypeIdentifier = logicalType.getIdentifier();
180192
@Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
181193
if (type == null) {
182194
throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import org.apache.beam.sdk.schemas.Schema;
38+
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
39+
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
3840
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
41+
import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType;
42+
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
43+
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
3944
import org.apache.beam.sdk.values.Row;
4045
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
4146
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -937,5 +942,103 @@ public void testStructIcebergSchemaToBeamSchema() {
937942

938943
assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
939944
}
945+
946+
static final Schema BEAM_SCHEMA_JDBC_ALL_TYPES =
947+
Schema.builder()
948+
.addField("array_field", Schema.FieldType.array(Schema.FieldType.STRING)) // from ARRAY
949+
.addField("bigint_field", Schema.FieldType.INT64) // from BIGINT
950+
.addField(
951+
"binary_field",
952+
Schema.FieldType.logicalType(VariableBytes.of("BINARY", 10))) // from BINARY
953+
.addField("bit_field", Schema.FieldType.BOOLEAN) // from BIT
954+
.addField("boolean_field", Schema.FieldType.BOOLEAN) // from BOOLEAN
955+
.addField(
956+
"char_field", Schema.FieldType.logicalType(FixedString.of("CHAR", 10))) // from CHAR
957+
.addField("date_field", Schema.FieldType.logicalType(SqlTypes.DATE)) // from DATE
958+
.addField("decimal_field", Schema.FieldType.DECIMAL) // from DECIMAL
959+
.addField("double_field", Schema.FieldType.DOUBLE) // from DOUBLE
960+
.addField("float_field", Schema.FieldType.DOUBLE) // from FLOAT
961+
.addField("integer_field", Schema.FieldType.INT32) // from INTEGER
962+
.addField(
963+
"longnvarchar_field",
964+
Schema.FieldType.logicalType(
965+
VariableString.of("LONGNVARCHAR", 100))) // from LONGNVARCHAR
966+
.addField(
967+
"longvarbinary_field",
968+
Schema.FieldType.logicalType(
969+
VariableBytes.of("LONGVARBINARY", 100))) // from LONGVARBINARY
970+
.addField(
971+
"longvarchar_field",
972+
Schema.FieldType.logicalType(
973+
VariableString.of("LONGVARCHAR", 100))) // from LONGVARCHAR
974+
.addField(
975+
"nchar_field",
976+
Schema.FieldType.logicalType(FixedString.of("NCHAR", 10))) // from NCHAR
977+
.addField(
978+
"numeric_field",
979+
Schema.FieldType.logicalType(FixedPrecisionNumeric.of(10, 5))) // from NUMERIC
980+
.addField(
981+
"nvarchar_field",
982+
Schema.FieldType.logicalType(VariableString.of("NVARCHAR", 100))) // from NVARCHAR
983+
.addField("real_field", Schema.FieldType.FLOAT) // from REAL
984+
.addField("smallint_field", Schema.FieldType.INT16) // from SMALLINT
985+
.addField("time_field", Schema.FieldType.logicalType(SqlTypes.TIME)) // from TIME
986+
.addField(
987+
"timestamp_field",
988+
Schema.FieldType.logicalType(SqlTypes.DATETIME)) // from TIMESTAMP
989+
.addField(
990+
"timestamp_with_timezone_field",
991+
Schema.FieldType.DATETIME) // from TIMESTAMP_WITH_TIMEZONE
992+
.addField("tinyint_field", Schema.FieldType.BYTE) // from TINYINT
993+
.addField(
994+
"varbinary_field",
995+
Schema.FieldType.logicalType(VariableBytes.of("VARBINARY", 100))) // from VARBINARY
996+
.addField(
997+
"varchar_field",
998+
Schema.FieldType.logicalType(VariableString.of("VARCHAR", 100))) // from VARCHAR
999+
.addField("blob_field", Schema.FieldType.BYTES) // from BLOB
1000+
.addField("clob_field", Schema.FieldType.STRING) // from CLOB
1001+
.addField(
1002+
"uuid_field", Schema.FieldType.logicalType(new UuidLogicalType())) // from UUID
1003+
.build();
1004+
1005+
static final org.apache.iceberg.Schema ICEBERG_SCHEMA_JDBC_ALL_TYPES =
1006+
new org.apache.iceberg.Schema(
1007+
required(1, "array_field", Types.ListType.ofRequired(29, Types.StringType.get())),
1008+
required(2, "bigint_field", Types.LongType.get()),
1009+
required(3, "binary_field", Types.BinaryType.get()),
1010+
required(4, "bit_field", Types.BooleanType.get()),
1011+
required(5, "boolean_field", Types.BooleanType.get()),
1012+
required(6, "char_field", Types.StringType.get()),
1013+
required(7, "date_field", Types.DateType.get()),
1014+
required(8, "decimal_field", Types.StringType.get()),
1015+
required(9, "double_field", Types.DoubleType.get()),
1016+
required(10, "float_field", Types.DoubleType.get()),
1017+
required(11, "integer_field", Types.IntegerType.get()),
1018+
required(12, "longnvarchar_field", Types.StringType.get()),
1019+
required(13, "longvarbinary_field", Types.BinaryType.get()),
1020+
required(14, "longvarchar_field", Types.StringType.get()),
1021+
required(15, "nchar_field", Types.StringType.get()),
1022+
required(16, "numeric_field", Types.DecimalType.of(10, 5)),
1023+
required(17, "nvarchar_field", Types.StringType.get()),
1024+
required(18, "real_field", Types.FloatType.get()),
1025+
required(19, "smallint_field", Types.StringType.get()),
1026+
required(20, "time_field", Types.TimeType.get()),
1027+
required(21, "timestamp_field", Types.TimestampType.withoutZone()),
1028+
required(22, "timestamp_with_timezone_field", Types.TimestampType.withZone()),
1029+
required(23, "tinyint_field", Types.StringType.get()),
1030+
required(24, "varbinary_field", Types.BinaryType.get()),
1031+
required(25, "varchar_field", Types.StringType.get()),
1032+
required(26, "blob_field", Types.BinaryType.get()),
1033+
required(27, "clob_field", Types.StringType.get()),
1034+
required(28, "uuid_field", Types.UUIDType.get()));
1035+
1036+
@Test
1037+
public void testJdbcBeamSchemaToIcebergSchema() {
1038+
org.apache.iceberg.Schema convertedIcebergSchema =
1039+
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_JDBC_ALL_TYPES);
1040+
1041+
assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_JDBC_ALL_TYPES));
1042+
}
9401043
}
9411044
}

0 commit comments

Comments
 (0)