Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Int64Value;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Map;
Expand Down Expand Up @@ -55,6 +56,11 @@ public class AvroGenericRecordToStorageApiProto {

private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);

private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
private static final long PICOSECOND_PRECISION = 12L;
private static final long NANOS_PER_SECOND = 1_000_000_000L;
private static final long PICOS_PER_NANO = 1000L;

static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
Expand Down Expand Up @@ -314,6 +320,7 @@ public static DynamicMessage messageFromGenericRecord(
@SuppressWarnings("nullness")
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
@Nullable Schema schema = field.schema();

Preconditions.checkNotNull(schema, "Unexpected null schema!");
if (StorageApiCDC.COLUMNS.contains(field.name())) {
throw new RuntimeException("Reserved field name " + field.name() + " in user schema.");
Expand Down Expand Up @@ -380,34 +387,45 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
.setType(unionFieldSchema.getType())
.setMode(unionFieldSchema.getMode())
.addAllFields(unionFieldSchema.getFieldsList());

if (unionFieldSchema.hasTimestampPrecision()) {
builder.setTimestampPrecision(unionFieldSchema.getTimestampPrecision());
}
break;
default:
elementType = TypeWithNullability.create(schema).getType();
Optional<LogicalType> logicalType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
@Nullable
TableFieldSchema.Type primitiveType =
logicalType
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
if (primitiveType == null) {
throw new RuntimeException("Unsupported type " + elementType.getType());
}
// a scalar will be required by default, if defined as part of union then
// caller will set nullability requirements
builder = builder.setType(primitiveType);
// parametrized types
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
int precision = decimal.getPrecision();
int scale = decimal.getScale();
if (!(precision == 38 && scale == 9) // NUMERIC
&& !(precision == 77 && scale == 38) // BIGNUMERIC
) {
// parametrized type
builder = builder.setPrecision(precision);
if (scale != 0) {
builder = builder.setScale(scale);
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) {
builder = builder.setType(TableFieldSchema.Type.TIMESTAMP);
builder.setTimestampPrecision(
Int64Value.newBuilder().setValue(PICOSECOND_PRECISION).build());
break;
} else {
Optional<LogicalType> logicalType =
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
@Nullable
TableFieldSchema.Type primitiveType =
logicalType
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
if (primitiveType == null) {
throw new RuntimeException("Unsupported type " + elementType.getType());
}
// a scalar will be required by default, if defined as part of union then
// caller will set nullability requirements
builder = builder.setType(primitiveType);
// parametrized types
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
int precision = decimal.getPrecision();
int scale = decimal.getScale();
if (!(precision == 38 && scale == 9) // NUMERIC
&& !(precision == 77 && scale == 38) // BIGNUMERIC
) {
// parametrized type
builder = builder.setPrecision(precision);
if (scale != 0) {
builder = builder.setScale(scale);
}
}
}
}
Expand Down Expand Up @@ -476,7 +494,7 @@ private static Object toProtoValue(
mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry))
.collect(Collectors.toList());
default:
return scalarToProtoValue(avroSchema, value);
return scalarToProtoValue(fieldDescriptor, avroSchema, value);
}
}

Expand All @@ -502,10 +520,42 @@ static Object mapEntryToProtoValue(
return builder.build();
}

private static DynamicMessage buildTimestampPicosMessage(
Descriptor timestampPicosDescriptor, long seconds, long picoseconds) {
return DynamicMessage.newBuilder(timestampPicosDescriptor)
.setField(
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")),
seconds)
.setField(
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")),
picoseconds)
.build();
}

@VisibleForTesting
static Object scalarToProtoValue(Schema fieldSchema, Object value) {
static Object scalarToProtoValue(
@Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) {
TypeWithNullability type = TypeWithNullability.create(fieldSchema);
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (timestamp-nanos).");
long nanos = (Long) value;

long seconds = nanos / NANOS_PER_SECOND;
long nanoAdjustment = nanos % NANOS_PER_SECOND;

// Handle negative timestamps (before epoch)
if (nanos < 0 && nanoAdjustment != 0) {
seconds -= 1;
nanoAdjustment += NANOS_PER_SECOND;
}

long picoseconds = nanoAdjustment * PICOS_PER_NANO;
return buildTimestampPicosMessage(
Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds);
}
LogicalType logicalType = LogicalTypes.fromSchema(type.getType());

if (logicalType != null) {
@Nullable
BiFunction<LogicalType, Object, Object> logicalTypeEncoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Int64Value;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
Expand All @@ -44,6 +45,7 @@
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
Expand Down Expand Up @@ -243,9 +245,24 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
if (logicalType == null) {
throw new RuntimeException("Unexpected null logical type " + field.getType());
}
@Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
if (type == null) {
throw new RuntimeException("Unsupported logical type " + field.getType());
@Nullable TableFieldSchema.Type type;
if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) {
int precision =
Preconditions.checkNotNull(
logicalType.getArgument(),
"Expected logical type argument for timestamp precision.");
if (precision != 9) {
throw new RuntimeException(
"Unsupported precision for Timestamp logical type " + precision);
}
// Map Timestamp.NANOS logical type to BigQuery TIMESTAMP(12) for nanosecond precision
type = TableFieldSchema.Type.TIMESTAMP;
builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build());
} else {
type = LOGICAL_TYPES.get(logicalType.getIdentifier());
if (type == null) {
throw new RuntimeException("Unsupported logical type " + field.getType());
}
}
builder = builder.setType(type);
break;
Expand Down Expand Up @@ -341,17 +358,39 @@ private static Object toProtoValue(
fieldDescriptor.getMessageType(), keyType, valueType, entry))
.collect(Collectors.toList());
default:
return scalarToProtoValue(beamFieldType, value);
return scalarToProtoValue(fieldDescriptor, beamFieldType, value);
}
}

private static DynamicMessage buildTimestampPicosMessage(
Descriptor timestampPicosDescriptor, Instant instant) {
long seconds = instant.getEpochSecond();
long picoseconds = instant.getNano() * 1000L; // nanos → picos

return DynamicMessage.newBuilder(timestampPicosDescriptor)
.setField(
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")),
seconds)
.setField(
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")),
picoseconds)
.build();
}

@VisibleForTesting
static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
static Object scalarToProtoValue(
@Nullable FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) {
if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
@Nullable LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
if (logicalType == null) {
throw new RuntimeException("Unexpectedly null logical type " + beamFieldType);
}
if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) {
Instant instant = (Instant) value;
Descriptor timestampPicosDescriptor =
Preconditions.checkNotNull(fieldDescriptor).getMessageType();
return buildTimestampPicosMessage(timestampPicosDescriptor, instant);
}
@Nullable
BiFunction<LogicalType<?, ?>, Object, Object> logicalTypeEncoder =
LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
if (useAvroLogicalTypes
&& (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
return fieldSchema.setType("TIMESTAMP");
return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L);
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
return fieldSchema.setType("TIME");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,17 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
field.setFields(toTableFieldSchema(mapSchema));
field.setMode(Mode.REPEATED.toString());
}
field.setType(toStandardSQLTypeName(type).toString());
Schema.LogicalType<?, ?> logicalType = type.getLogicalType();
if (logicalType != null && Timestamp.IDENTIFIER.equals(logicalType.getIdentifier())) {
int precision = Preconditions.checkArgumentNotNull(logicalType.getArgument());
if (precision != 9) {
throw new IllegalArgumentException(
"Unsupported precision for Timestamp logical type " + precision);
}
field.setType(StandardSQLTypeName.TIMESTAMP.toString()).setTimestampPrecision(12L);
} else {
field.setType(toStandardSQLTypeName(type).toString());
}

fields.add(field);
}
Expand Down
Loading
Loading