Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Bytes;

/**
* Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
* for use with the Storage write API.
*/
public class AvroGenericRecordToStorageApiProto {

private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);

static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
Expand All @@ -67,14 +68,37 @@ public class AvroGenericRecordToStorageApiProto {
.build();

// A map of supported logical types to the protobuf field type.
static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
ImmutableMap.<String, TableFieldSchema.Type>builder()
.put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
.put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
.put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
.build();
static Optional<TableFieldSchema.Type> logicalTypes(LogicalType logicalType) {
switch (logicalType.getName()) {
case "date":
return Optional.of(TableFieldSchema.Type.DATE);
case "time-micros":
return Optional.of(TableFieldSchema.Type.TIME);
case "time-millis":
return Optional.of(TableFieldSchema.Type.TIME);
case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
int scale = decimal.getScale();
int precision = decimal.getPrecision();
if (scale > 9 || precision - scale > 29) {
return Optional.of(TableFieldSchema.Type.BIGNUMERIC);
} else {
return Optional.of(TableFieldSchema.Type.NUMERIC);
}
case "timestamp-micros":
return Optional.of(TableFieldSchema.Type.TIMESTAMP);
case "timestamp-millis":
return Optional.of(TableFieldSchema.Type.TIMESTAMP);
case "local-timestamp-micros":
return Optional.of(TableFieldSchema.Type.DATETIME);
case "local-timestamp-millis":
return Optional.of(TableFieldSchema.Type.DATETIME);
case "uuid":
return Optional.of(TableFieldSchema.Type.STRING);
default:
return Optional.empty();
}
}

static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
Expand All @@ -92,16 +116,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to their encoding functions.
static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
.put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
.put(
LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
.put(
LogicalTypes.timestampMicros().getName(),
(logicalType, value) -> convertTimestamp(value, true))
.put(
LogicalTypes.timestampMillis().getName(),
(logicalType, value) -> convertTimestamp(value, false))
.put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
.put("date", (logicalType, value) -> convertDate(value))
.put("time-micros", (logicalType, value) -> convertTime(value, true))
.put("time-millis", (logicalType, value) -> convertTime(value, false))
.put("decimal", AvroGenericRecordToStorageApiProto::convertDecimal)
.put("timestamp-micros", (logicalType, value) -> convertTimestamp(value, true))
.put("timestamp-millis", (logicalType, value) -> convertTimestamp(value, false))
.put("local-timestamp-micros", (logicalType, value) -> convertDateTime(value, true))
.put("local-timestamp-millis", (logicalType, value) -> convertDateTime(value, false))
.put("uuid", (logicalType, value) -> convertUUID(value))
.build();

static String convertUUID(Object value) {
Expand All @@ -115,34 +138,96 @@ static String convertUUID(Object value) {
}

static Long convertTimestamp(Object value, boolean micros) {
if (value instanceof ReadableInstant) {
return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong. BQ always expects epoch microseconds. Conversion should be applied on the raw type depending if it represents millis or micros

if (value instanceof org.joda.time.ReadableInstant) {
return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
} else if (value instanceof java.time.Instant) {
java.time.Instant instant = (java.time.Instant) value;
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
long adjustment = (nanos / 1_000L) - 1_000_000L;
return Math.addExact(ms, adjustment);
} else {
long ms = Math.multiplyExact(seconds, 1_000_000L);
return Math.addExact(ms, nanos / 1_000L);
}
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (millis).");
return (Long) value;
value instanceof Long, "Expecting a value as Long type (timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static Integer convertDate(Object value) {
if (value instanceof ReadableInstant) {
return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
if (value instanceof org.joda.time.LocalDate) {
return org.joda.time.Days.daysBetween(EPOCH_DATE, (org.joda.time.LocalDate) value).getDays();
} else if (value instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) value).toEpochDay();
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (days).");
return (Integer) value;
}
}

static Long convertTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalTime) {
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
} else if (value instanceof java.time.LocalTime) {
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
((java.time.LocalTime) value).toNanoOfDay());
} else {
if (micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return (Long) value;
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return 1_000L * (Integer) value;
}
}
}

static Long convertDateTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalDateTime) {
// we should never come here as local-timestamp has been added after joda deprecation
// implement nonetheless for consistency
org.joda.time.DateTime dateTime =
((org.joda.time.LocalDateTime) value).toDateTime(org.joda.time.DateTimeZone.UTC);
return 1_000L * dateTime.getMillis();
} else if (value instanceof java.time.LocalDateTime) {
java.time.Instant instant =
((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC);
return convertTimestamp(instant, micros);
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (local-timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static ByteString convertDecimal(LogicalType logicalType, Object value) {
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
ByteBuffer byteBuffer;
if (value instanceof BigDecimal) {
// BigDecimalByteStringEncoder does not support parametrized NUMERIC/BIGNUMERIC
byteBuffer =
new Conversions.DecimalConversion()
.toBytes(
(BigDecimal) value,
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
} else {
Preconditions.checkArgument(
value instanceof ByteBuffer, "Expecting a value as ByteBuffer type (decimal).");
byteBuffer = (ByteBuffer) value;
}
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.duplicate().get(bytes);
Bytes.reverse(bytes);
return ByteString.copyFrom(bytes);
}

/**
Expand Down Expand Up @@ -213,7 +298,7 @@ public static DynamicMessage messageFromGenericRecord(
return builder.build();
}

private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
@Nullable Schema schema = field.schema();
Preconditions.checkNotNull(schema, "Unexpected null schema!");
if (StorageApiCDC.COLUMNS.contains(field.name())) {
Expand Down Expand Up @@ -282,17 +367,34 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field)
break;
default:
elementType = TypeWithNullability.create(schema).getType();
Optional<LogicalType> logicalType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
@Nullable
TableFieldSchema.Type primitiveType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType))
.map(logicalType -> LOGICAL_TYPES.get(logicalType.getName()))
logicalType
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
if (primitiveType == null) {
throw new RuntimeException("Unsupported type " + elementType.getType());
}
// a scalar will be required by default, if defined as part of union then
// caller will set nullability requirements
builder = builder.setType(primitiveType);
// parametrized types
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
int precision = decimal.getPrecision();
int scale = decimal.getScale();
if (!(precision == 38 && scale == 9) // NUMERIC
&& !(precision == 77 && scale == 38) // BIGNUMERIC
) {
// parametrized type
builder = builder.setPrecision(precision);
if (scale != 0) {
builder = builder.setScale(scale);
}
}
}
}
if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
if (TypeWithNullability.create(schema).isNullable()) {
Expand Down
Loading