Skip to content

Commit cad6513

Browse files
authored
Merge pull request #24925: [release-2.44.0] Cherry pick: Attempt deserialize all non-standard portable logical types from proto
2 parents 342e3bb + 190997e commit cad6513

File tree

2 files changed

+78
-21
lines changed

2 files changed

+78
-21
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,16 @@
5656
import org.apache.beam.sdk.util.SerializableUtils;
5757
import org.apache.beam.sdk.values.Row;
5858
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
59+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
5960
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
6061
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
6162
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
6263
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
6364
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
6465
import org.apache.commons.lang3.ClassUtils;
6566
import org.checkerframework.checker.nullness.qual.Nullable;
67+
import org.slf4j.Logger;
68+
import org.slf4j.LoggerFactory;
6669

6770
/** Utility methods for translating schemas. */
6871
@Experimental(Kind.SCHEMAS)
@@ -71,6 +74,7 @@
7174
"rawtypes"
7275
})
7376
public class SchemaTranslation {
77+
private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class);
7478

7579
private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER;
7680
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";
@@ -124,8 +128,8 @@ private static SchemaApi.Field fieldToProto(
124128
.build();
125129
}
126130

127-
private static SchemaApi.FieldType fieldTypeToProto(
128-
FieldType fieldType, boolean serializeLogicalType) {
131+
@VisibleForTesting
132+
static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) {
129133
SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder();
130134
switch (fieldType.getTypeName()) {
131135
case ROW:
@@ -297,7 +301,8 @@ private static Field fieldFromProto(SchemaApi.Field protoField) {
297301
.withDescription(protoField.getDescription());
298302
}
299303

300-
private static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) {
304+
@VisibleForTesting
305+
static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) {
301306
FieldType fieldType = fieldTypeFromProtoWithoutNullable(protoFieldType);
302307

303308
if (protoFieldType.getNullable()) {
@@ -426,26 +431,32 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
426431
return FieldType.DATETIME;
427432
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
428433
return FieldType.DECIMAL;
429-
} else if (urn.equals(URN_BEAM_LOGICAL_JAVASDK)) {
430-
return FieldType.logicalType(
431-
(LogicalType)
432-
SerializableUtils.deserializeFromByteArray(
433-
logicalType.getPayload().toByteArray(), "logicalType"));
434-
} else {
435-
@Nullable FieldType argumentType = null;
436-
@Nullable Object argumentValue = null;
437-
if (logicalType.hasArgumentType()) {
438-
argumentType = fieldTypeFromProto(logicalType.getArgumentType());
439-
argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument());
434+
} else if (urn.startsWith("beam:logical_type:")) {
435+
try {
436+
return FieldType.logicalType(
437+
(LogicalType)
438+
SerializableUtils.deserializeFromByteArray(
439+
logicalType.getPayload().toByteArray(), "logicalType"));
440+
} catch (IllegalArgumentException e) {
441+
LOG.warn(
442+
"Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.",
443+
urn);
440444
}
441-
return FieldType.logicalType(
442-
new UnknownLogicalType(
443-
urn,
444-
logicalType.getPayload().toByteArray(),
445-
argumentType,
446-
argumentValue,
447-
fieldTypeFromProto(logicalType.getRepresentation())));
448445
}
446+
// assemble an UnknownLogicalType
447+
@Nullable FieldType argumentType = null;
448+
@Nullable Object argumentValue = null;
449+
if (logicalType.hasArgumentType()) {
450+
argumentType = fieldTypeFromProto(logicalType.getArgumentType());
451+
argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument());
452+
}
453+
return FieldType.logicalType(
454+
new UnknownLogicalType(
455+
urn,
456+
logicalType.getPayload().toByteArray(),
457+
argumentType,
458+
argumentValue,
459+
fieldTypeFromProto(logicalType.getRepresentation())));
449460
default:
450461
throw new IllegalArgumentException(
451462
"Unexpected type_info: " + protoFieldType.getTypeInfoCase());

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,16 @@
4040
import org.apache.beam.sdk.schemas.Schema.FieldType;
4141
import org.apache.beam.sdk.schemas.logicaltypes.DateTime;
4242
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
43+
import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric;
44+
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
4345
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
46+
import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
47+
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
4448
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
4549
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
50+
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
51+
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
52+
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
4653
import org.apache.beam.sdk.values.Row;
4754
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
4855
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
@@ -395,6 +402,45 @@ public void typeInfoNotSet() {
395402
}
396403
}
397404

405+
/** Test schema translation of logical types. */
406+
@RunWith(Parameterized.class)
407+
public static class LogicalTypesTest {
408+
@Parameters(name = "{index}: {0}")
409+
public static Iterable<Schema.FieldType> data() {
410+
return ImmutableList.<Schema.FieldType>builder()
411+
.add(FieldType.logicalType(SqlTypes.DATE))
412+
.add(FieldType.logicalType(SqlTypes.TIME))
413+
.add(FieldType.logicalType(SqlTypes.DATETIME))
414+
.add(FieldType.logicalType(SqlTypes.TIMESTAMP))
415+
.add(FieldType.logicalType(new NanosInstant()))
416+
.add(FieldType.logicalType(new NanosDuration()))
417+
.add(FieldType.logicalType(FixedBytes.of(10)))
418+
.add(FieldType.logicalType(VariableBytes.of(10)))
419+
.add(FieldType.logicalType(FixedString.of(10)))
420+
.add(FieldType.logicalType(VariableString.of(10)))
421+
.add(FieldType.logicalType(FixedPrecisionNumeric.of(10)))
422+
.build();
423+
}
424+
425+
@Parameter(0)
426+
public Schema.FieldType fieldType;
427+
428+
@Test
429+
public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
430+
SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true);
431+
Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto);
432+
433+
assertThat(
434+
translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass()));
435+
assertThat(
436+
translated.getLogicalType().getArgumentType(),
437+
equalTo(fieldType.getLogicalType().getArgumentType()));
438+
assertThat(
439+
translated.getLogicalType().getArgument(),
440+
equalTo(fieldType.getLogicalType().getArgument()));
441+
}
442+
}
443+
398444
/** A simple logical type that has no argument. */
399445
private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> {
400446
public static final String IDENTIFIER = "beam:logical_type:null_argument:v1";

0 commit comments

Comments
 (0)