Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
Expand All @@ -44,15 +46,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/**
* Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
* for use with the Storage write API.
*/
public class AvroGenericRecordToStorageApiProto {

private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);

static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
Expand All @@ -69,11 +71,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to the protobuf field type.
static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
ImmutableMap.<String, TableFieldSchema.Type>builder()
.put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
.put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
.put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
.put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
.put("date", TableFieldSchema.Type.DATE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason to change them to hard coded names here? I understand they should be equivalent? Or keep the getName() while note the resolved name as comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are equivalent. I changed because type name for decimal is not accessible and requires creation of a 'fake' logical-type.

LogicalTypes.decimal(1).getName()

I can revert to the old style if that's prefered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both are fine, if choose to go with resolved names, good to add a comment these come from corresponding avro LogicalTypes implementations' getName()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has been refactored to accomodate conversion from avro decimal type to BQ NUMERIC/BIGNUMERIC

.put("time-micros", TableFieldSchema.Type.TIME)
.put("time-millis", TableFieldSchema.Type.TIME)
.put("decimal", TableFieldSchema.Type.BIGNUMERIC)
.put("timestamp-micros", TableFieldSchema.Type.TIMESTAMP)
.put("timestamp-millis", TableFieldSchema.Type.TIMESTAMP)
.put("local-timestamp-micros", TableFieldSchema.Type.DATETIME)
.put("local-timestamp-millis", TableFieldSchema.Type.DATETIME)
.put("uuid", TableFieldSchema.Type.STRING)
.build();

static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
Expand All @@ -92,16 +98,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to their encoding functions.
static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
.put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
.put(
LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
.put(
LogicalTypes.timestampMicros().getName(),
(logicalType, value) -> convertTimestamp(value, true))
.put(
LogicalTypes.timestampMillis().getName(),
(logicalType, value) -> convertTimestamp(value, false))
.put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
.put("date", (logicalType, value) -> convertDate(value))
.put("time-micros", (logicalType, value) -> convertTime(value, true))
.put("time-millis", (logicalType, value) -> convertTime(value, false))
.put("decimal", AvroGenericRecordToStorageApiProto::convertDecimal)
.put("timestamp-micros", (logicalType, value) -> convertTimestamp(value, true))
.put("timestamp-millis", (logicalType, value) -> convertTimestamp(value, false))
.put("local-timestamp-micros", (logicalType, value) -> convertDateTime(value, true))
.put("local-timestamp-millis", (logicalType, value) -> convertDateTime(value, false))
.put("uuid", (logicalType, value) -> convertUUID(value))
.build();

static String convertUUID(Object value) {
Expand All @@ -115,34 +120,97 @@ static String convertUUID(Object value) {
}

static Long convertTimestamp(Object value, boolean micros) {
if (value instanceof ReadableInstant) {
return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was wrong. BQ always expects epoch microseconds. Conversion should be applied on the raw type depending if it represents millis or micros

if (value instanceof org.joda.time.ReadableInstant) {
return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
} else if (value instanceof java.time.Instant) {
java.time.Instant instant = (java.time.Instant) value;
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
long adjustment = (nanos / 1_000L) - 1_000_000L;
return Math.addExact(ms, adjustment);
} else {
long ms = Math.multiplyExact(seconds, 1_000_000L);
return Math.addExact(ms, nanos / 1_000L);
}
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (millis).");
return (Long) value;
value instanceof Long, "Expecting a value as Long type (timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static Integer convertDate(Object value) {
if (value instanceof ReadableInstant) {
return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
if (value instanceof org.joda.time.LocalDate) {
return org.joda.time.Days.daysBetween(EPOCH_DATE, (org.joda.time.LocalDate) value).getDays();
} else if (value instanceof java.time.LocalDate) {
return (int) ((java.time.LocalDate) value).toEpochDay();
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (days).");
return (Integer) value;
}
}

static Long convertTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalTime) {
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
} else if (value instanceof java.time.LocalTime) {
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
((java.time.LocalTime) value).toNanoOfDay());
} else {
if (micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return (Long) value;
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return 1_000L * (Integer) value;
}
}
}

static Long convertDateTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalDateTime) {
// we should never come here as local-timestamp has been added after joda deprecation
// implement nonetheless for consistency
org.joda.time.DateTime dateTime =
((org.joda.time.LocalDateTime) value).toDateTime(org.joda.time.DateTimeZone.UTC);
return 1_000L * dateTime.getMillis();
} else if (value instanceof java.time.LocalDateTime) {
java.time.Instant instant =
((java.time.LocalDateTime) value).toInstant(java.time.ZoneOffset.UTC);
return convertTimestamp(instant, micros);
} else {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (local-timestamp).");
return (micros ? 1 : 1_000L) * ((Long) value);
}
}

static ByteString convertDecimal(LogicalType logicalType, Object value) {
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
if (value instanceof BigDecimal) {
LogicalTypes.Decimal type = (LogicalTypes.Decimal) logicalType;
BigDecimal bigDecimal =
((BigDecimal) value)
.setScale(type.getScale(), RoundingMode.DOWN)
.round(new MathContext(type.getPrecision(), RoundingMode.DOWN));
Copy link
Contributor Author

@RustedBones RustedBones Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seems legit to round ? We might also fail if the BigDecimal precision and scale do not match with expected logical type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand previously it only accepts a ByteBuffer, now it's adding support of java.math.BigDecimal? If previously it would fail or it's not lose precision compared to the existing behavior I think it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other conversions accept the logical-type as well as the underlying-type.

This is mandatory to support both: even if the logical-type is present on the schema, it can be discarded when the GenericData used for serialization does not have the feature enabled.

Concerning the rounding, The doc states to use BigDecimalByteStringEncoder. BeamRowToStorageApiProto is a copy of that.

It's however not supporting parameterized NUMERIC and BIGNUMERIC types.

return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
} else {
Preconditions.checkArgument(
value instanceof ByteBuffer, "Expecting a value as ByteBuffer type (decimal).");
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(
byteBuffer.duplicate(),
Schema.create(Schema.Type.NULL), // dummy schema, not used
logicalType);
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
}
}

/**
Expand Down Expand Up @@ -213,7 +281,7 @@ public static DynamicMessage messageFromGenericRecord(
return builder.build();
}

private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
@Nullable Schema schema = field.schema();
Preconditions.checkNotNull(schema, "Unexpected null schema!");
if (StorageApiCDC.COLUMNS.contains(field.name())) {
Expand Down Expand Up @@ -309,7 +377,10 @@ private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field)

@Nullable
private static Object messageValueFromGenericRecordValue(
FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) {
FieldDescriptor fieldDescriptor,
org.apache.avro.Schema.Field avroField,
String name,
GenericRecord record) {
@Nullable Object value = record.get(name);
if (value == null) {
if (fieldDescriptor.isOptional()
Expand All @@ -325,7 +396,7 @@ private static Object messageValueFromGenericRecordValue(
}

private static Object toProtoValue(
FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) {
FieldDescriptor fieldDescriptor, org.apache.avro.Schema avroSchema, Object value) {
switch (avroSchema.getType()) {
case RECORD:
return messageFromGenericRecord(
Expand Down
Loading