diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index fd9dffc260e7..76174ac3d04d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Int64Value; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Map; @@ -55,6 +56,11 @@ public class AvroGenericRecordToStorageApiProto { private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1); + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; + private static final long PICOSECOND_PRECISION = 12L; + private static final long NANOS_PER_SECOND = 1_000_000_000L; + private static final long PICOS_PER_NANO = 1000L; + static final Map PRIMITIVE_TYPES = ImmutableMap.builder() .put(Schema.Type.INT, TableFieldSchema.Type.INT64) @@ -314,6 +320,7 @@ public static DynamicMessage messageFromGenericRecord( @SuppressWarnings("nullness") private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) { @Nullable Schema schema = field.schema(); + Preconditions.checkNotNull(schema, "Unexpected null schema!"); if (StorageApiCDC.COLUMNS.contains(field.name())) { throw new RuntimeException("Reserved field name " + field.name() + " in user schema."); @@ -380,34 +387,45 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch .setType(unionFieldSchema.getType()) .setMode(unionFieldSchema.getMode()) .addAllFields(unionFieldSchema.getFieldsList()); + + if (unionFieldSchema.hasTimestampPrecision()) { + builder.setTimestampPrecision(unionFieldSchema.getTimestampPrecision()); + } break; default: elementType = TypeWithNullability.create(schema).getType(); - Optional logicalType = - Optional.ofNullable(LogicalTypes.fromSchema(elementType)); - @Nullable - TableFieldSchema.Type primitiveType = - logicalType - .flatMap(AvroGenericRecordToStorageApiProto::logicalTypes) - .orElse(PRIMITIVE_TYPES.get(elementType.getType())); - if (primitiveType == null) { - throw new RuntimeException("Unsupported type " + elementType.getType()); - } - // a scalar will be required by default, if defined as part of union then - // caller will set nullability requirements - builder = builder.setType(primitiveType); - // parametrized types - if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) { - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get(); - int precision = decimal.getPrecision(); - int scale = decimal.getScale(); - if (!(precision == 38 && scale == 9) // NUMERIC - && !(precision == 77 && scale == 38) // BIGNUMERIC - ) { - // parametrized type - builder = builder.setPrecision(precision); - if (scale != 0) { - builder = builder.setScale(scale); + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) { + builder = builder.setType(TableFieldSchema.Type.TIMESTAMP); + builder.setTimestampPrecision( + Int64Value.newBuilder().setValue(PICOSECOND_PRECISION).build()); + break; + } else { + Optional logicalType = + Optional.ofNullable(LogicalTypes.fromSchema(elementType)); + @Nullable + TableFieldSchema.Type primitiveType = + logicalType + .flatMap(AvroGenericRecordToStorageApiProto::logicalTypes) + .orElse(PRIMITIVE_TYPES.get(elementType.getType())); + if (primitiveType == null) { + throw new RuntimeException("Unsupported type " + elementType.getType()); + } + // a scalar will be required by default, if defined as part of union then + // caller will set nullability requirements + builder = builder.setType(primitiveType); + // parametrized types + if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get(); + int precision = decimal.getPrecision(); + int scale = decimal.getScale(); + if (!(precision == 38 && scale == 9) // NUMERIC + && !(precision == 77 && scale == 38) // BIGNUMERIC + ) { + // parametrized type + builder = builder.setPrecision(precision); + if (scale != 0) { + builder = builder.setScale(scale); + } } } } @@ -476,7 +494,7 @@ private static Object toProtoValue( mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry)) .collect(Collectors.toList()); default: - return scalarToProtoValue(avroSchema, value); + return scalarToProtoValue(fieldDescriptor, avroSchema, value); } } @@ -502,10 +520,42 @@ static Object mapEntryToProtoValue( return builder.build(); } + private static DynamicMessage buildTimestampPicosMessage( + Descriptor timestampPicosDescriptor, long seconds, long picoseconds) { + return DynamicMessage.newBuilder(timestampPicosDescriptor) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")), + seconds) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")), + picoseconds) + .build(); + } + @VisibleForTesting - static Object scalarToProtoValue(Schema fieldSchema, Object value) { + static Object scalarToProtoValue( + @Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) { TypeWithNullability type = TypeWithNullability.create(fieldSchema); + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (timestamp-nanos)."); + long nanos = (Long) value; + + long seconds = nanos / NANOS_PER_SECOND; + long nanoAdjustment = nanos % NANOS_PER_SECOND; + + // Handle negative timestamps (before epoch) + if (nanos < 0 && nanoAdjustment != 0) { + seconds -= 1; + nanoAdjustment += NANOS_PER_SECOND; + } + + long picoseconds = nanoAdjustment * PICOS_PER_NANO; + return buildTimestampPicosMessage( + Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds); + } LogicalType logicalType = LogicalTypes.fromSchema(type.getType()); + if (logicalType != null) { @Nullable BiFunction logicalTypeEncoder = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index d7ca787feea3..adb8e4468c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Int64Value; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; @@ -44,6 +45,7 @@ import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; @@ -243,9 +245,24 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { if (logicalType == null) { throw new RuntimeException("Unexpected null logical type " + field.getType()); } - @Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier()); - if (type == null) { - throw new RuntimeException("Unsupported logical type " + field.getType()); + @Nullable TableFieldSchema.Type type; + if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { + int precision = + Preconditions.checkNotNull( + logicalType.getArgument(), + "Expected logical type argument for timestamp precision."); + if (precision != 9) { + throw new RuntimeException( + "Unsupported precision for Timestamp logical type " + precision); + } + // Map Timestamp.NANOS logical type to BigQuery TIMESTAMP(12) for nanosecond precision + type = TableFieldSchema.Type.TIMESTAMP; + builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); + } else { + type = LOGICAL_TYPES.get(logicalType.getIdentifier()); + if (type == null) { + throw new RuntimeException("Unsupported logical type " + field.getType()); + } } builder = builder.setType(type); break; @@ -341,17 +358,39 @@ private static Object toProtoValue( fieldDescriptor.getMessageType(), keyType, valueType, entry)) .collect(Collectors.toList()); default: - return scalarToProtoValue(beamFieldType, value); + return scalarToProtoValue(fieldDescriptor, beamFieldType, value); } } + private static DynamicMessage buildTimestampPicosMessage( + Descriptor timestampPicosDescriptor, Instant instant) { + long seconds = instant.getEpochSecond(); + long picoseconds = instant.getNano() * 1000L; // nanos → picos + + return DynamicMessage.newBuilder(timestampPicosDescriptor) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")), + seconds) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")), + picoseconds) + .build(); + } + @VisibleForTesting - static Object scalarToProtoValue(FieldType beamFieldType, Object value) { + static Object scalarToProtoValue( + @Nullable FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) { if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) { @Nullable LogicalType logicalType = beamFieldType.getLogicalType(); if (logicalType == null) { throw new RuntimeException("Unexpectedly null logical type " + beamFieldType); } + if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { + Instant instant = (Instant) value; + Descriptor timestampPicosDescriptor = + Preconditions.checkNotNull(fieldDescriptor).getMessageType(); + return buildTimestampPicosMessage(timestampPicosDescriptor, instant); + } @Nullable BiFunction, Object, Object> logicalTypeEncoder = LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index b5243a8110b7..e1ff0f58f148 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -644,7 +644,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. if (useAvroLogicalTypes && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { - return fieldSchema.setType("TIMESTAMP"); + return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L); } if (logicalType instanceof LogicalTypes.TimeMicros) { return fieldSchema.setType("TIME"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 948987871273..16dbc2b5f186 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -587,7 +587,17 @@ private static List toTableFieldSchema(Schema schema) { field.setFields(toTableFieldSchema(mapSchema)); field.setMode(Mode.REPEATED.toString()); } - field.setType(toStandardSQLTypeName(type).toString()); + Schema.LogicalType logicalType = type.getLogicalType(); + if (logicalType != null && Timestamp.IDENTIFIER.equals(logicalType.getIdentifier())) { + int precision = Preconditions.checkArgumentNotNull(logicalType.getArgument()); + if (precision != 9) { + throw new IllegalArgumentException( + "Unsupported precision for Timestamp logical type " + precision); + } + field.setType(StandardSQLTypeName.TIMESTAMP.toString()).setTimestampPrecision(12L); + } else { + field.setType(toStandardSQLTypeName(type).toString()); + } fields.add(field); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index ecf49cd6d8bb..deabb1dd05fc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.protobuf.ByteString; @@ -336,6 +337,19 @@ enum TestEnum { .noDefault() .endRecord(); + private static Schema createTimestampNanosSchema() { + Schema longSchema = Schema.create(Schema.Type.LONG); + longSchema.addProp("logicalType", "timestamp-nanos"); + return SchemaBuilder.record("TimestampNanosRecord") + .fields() + .name("timestampNanosValue") + .type(longSchema) + .noDefault() + .endRecord(); + } + + private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema(); + private static GenericRecord baseRecord; private static GenericRecord rawLogicalTypesRecord; private static GenericRecord jodaTimeLogicalTypesRecord; @@ -765,4 +779,110 @@ public void testMessageFromGenericRecordWithNullableArrayWithNullValue() throws List list = (List) msg.getField(fieldDescriptors.get("anullablearray")); assertEquals(Collections.emptyList(), list); } + + @Test + public void testDescriptorFromSchemaTimestampNanos() { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + assertEquals(1, descriptor.getFieldCount()); + FieldDescriptorProto field = descriptor.getField(0); + assertEquals("timestampnanosvalue", field.getName()); + assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals("TimestampPicos", field.getTypeName()); + + // Verify nested TimestampPicos type exists + assertEquals(1, descriptor.getNestedTypeCount()); + DescriptorProto nestedType = descriptor.getNestedType(0); + assertEquals("TimestampPicos", nestedType.getName()); + assertEquals(2, nestedType.getFieldCount()); + } + + @Test + public void testMessageFromGenericRecordTimestampNanos() throws Exception { + // 2024-01-15 12:30:45.123456789 → nanoseconds since epoch + // Seconds: 1705321845 + // Nanos: 123456789 + // Total nanos = 1705321845 * 1_000_000_000 + 123456789 = 1705321845123456789L + long nanosValue = 1705321845123456789L; + + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_SCHEMA) + .set("timestampNanosValue", nanosValue) + .build(); + + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, record, null, -1); + + assertEquals(1, msg.getAllFields().size()); + + // Get the TimestampPicos field + Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName("timestampnanosvalue"); + DynamicMessage timestampPicos = (DynamicMessage) msg.getField(timestampField); + + // Verify seconds and picoseconds + Descriptors.Descriptor picosDesc = timestampField.getMessageType(); + long seconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("seconds")); + long picoseconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("picoseconds")); + + assertEquals(1705321845L, seconds); + assertEquals(123456789L * 1000L, picoseconds); // 123456789000 picos + } + + @Test + public void testMessageFromGenericRecordTimestampNanosNegative() throws Exception { + // -0.5 seconds = -500_000_000 nanoseconds + long nanosValue = -500_000_000L; + + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_SCHEMA) + .set("timestampNanosValue", nanosValue) + .build(); + + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, record, null, -1); + + Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName("timestampnanosvalue"); + DynamicMessage timestampPicos = (DynamicMessage) msg.getField(timestampField); + + Descriptors.Descriptor picosDesc = timestampField.getMessageType(); + long seconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("seconds")); + long picoseconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("picoseconds")); + + // -0.5s should be represented as {seconds: -1, picoseconds: 500_000_000_000} + assertEquals(-1L, seconds); + assertEquals(500_000_000_000L, picoseconds); // 500 million picos + } + + @Test + public void testProtoTableSchemaFromAvroSchemaTimestampNanos() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(TIMESTAMP_NANOS_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0); + assertEquals("timestampnanosvalue", field.getName()); + assertEquals( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertTrue(field.hasTimestampPrecision()); + assertEquals(12L, field.getTimestampPrecision().getValue()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 94e33015a627..d7a88615a50b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; @@ -47,6 +48,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -60,6 +62,13 @@ /** Unit tests form {@link BeamRowToStorageApiProto}. */ @RunWith(JUnit4.class) public class BeamRowToStorageApiProtoTest { + private static final java.time.Instant TEST_INSTANT_NANOS = + java.time.Instant.parse("2024-01-15T12:30:45.123456789Z"); + + private static final Schema TIMESTAMP_NANOS_SCHEMA = + Schema.builder() + .addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true)) + .build(); private static final EnumerationType TEST_ENUM = EnumerationType.create("ONE", "TWO", "RED", "BLUE"); private static final Schema BASE_SCHEMA = @@ -589,8 +598,59 @@ public void testScalarToProtoValue() { p -> { assertEquals( p.getValue(), - BeamRowToStorageApiProto.scalarToProtoValue(entry.getKey(), p.getKey())); + BeamRowToStorageApiProto.scalarToProtoValue(null, entry.getKey(), p.getKey())); }); } } + + @Test + public void testTimestampNanosSchema() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + TableFieldSchema field = protoSchema.getFields(0); + assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertEquals(12L, field.getTimestampPrecision().getValue()); + } + + @Test + public void testTimestampNanosDescriptor() throws Exception { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA), + true, + false); + + FieldDescriptorProto field = descriptor.getField(0); + assertEquals("timestampnanos", field.getName()); + assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals("TimestampPicos", field.getTypeName()); + } + + @Test + public void testTimestampNanosMessage() throws Exception { + Row row = + Row.withSchema(TIMESTAMP_NANOS_SCHEMA) + .withFieldValue("timestampNanos", TEST_INSTANT_NANOS) + .build(); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, row, null, -1); + + FieldDescriptor field = descriptor.findFieldByName("timestampnanos"); + DynamicMessage picos = (DynamicMessage) msg.getField(field); + Descriptor picosDesc = field.getMessageType(); + + assertEquals( + TEST_INSTANT_NANOS.getEpochSecond(), picos.getField(picosDesc.findFieldByName("seconds"))); + assertEquals( + TEST_INSTANT_NANOS.getNano() * 1000L, + picos.getField(picosDesc.findFieldByName("picoseconds"))); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index e95e15465966..ce6d53af4003 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -953,7 +953,8 @@ public void testConvertAvroSchemaToBigQuerySchema() { String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; Schema timestampType = new Schema.Parser().parse(timestampNanosJson); Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault()); - TableSchema expected = tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED")); + TableSchema expected = + tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED").setTimestampPrecision(12L)); TableSchema expectedRaw = tableSchema(f -> f.setType("INTEGER").setMode("REQUIRED")); assertEquals(expected, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 5deffd4028f9..6d155185ee62 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -23,13 +23,19 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import java.security.SecureRandom; import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -392,6 +398,112 @@ public void testReadWithNanosPrecision_Arrow() { runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput, simpleTableSpec); } + // Schema with custom timestamp-nanos logical type + private static org.apache.avro.Schema createTimestampNanosAvroSchema() { + org.apache.avro.Schema longSchema = + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); + longSchema.addProp("logicalType", "timestamp-nanos"); + return org.apache.avro.SchemaBuilder.record("TimestampNanosRecord") + .fields() + .name("ts_nanos") + .type(longSchema) + .noDefault() + .endRecord(); + } + + private static final java.time.Instant TEST_INSTANT = + java.time.Instant.parse("2024-01-15T10:30:45.123456789Z"); + + private static final org.apache.avro.Schema TIMESTAMP_NANOS_AVRO_SCHEMA = + createTimestampNanosAvroSchema(); + + @Test + public void testWriteGenericRecordTimestampNanos() throws Exception { + String tableSpec = + String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test"); + + // Create GenericRecord with timestamp-nanos value + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) + .set( + "ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano()) + .build(); + + // Write using Storage Write API with Avro format + Pipeline writePipeline = Pipeline.create(bqOptions); + writePipeline + .apply("CreateData", Create.of(record).withCoder(AvroCoder.of(TIMESTAMP_NANOS_AVRO_SCHEMA))) + .apply( + "WriteGenericRecords", + BigQueryIO.writeGenericRecords() + .to(tableSpec) + .withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA) + .withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true)) + .useAvroLogicalTypes() + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + writePipeline.run().waitUntilFinish(); + + // Read back and verify + Pipeline readPipeline = Pipeline.create(bqOptions); + PCollection result = + readPipeline.apply( + "Read", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(DataFormat.AVRO) + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS) + .from(tableSpec)); + + PAssert.that(result) + .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + readPipeline.run().waitUntilFinish(); + } + + private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA = + Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build(); + + @Test + public void testWriteBeamRowTimestampNanos() throws Exception { + String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "beam_row_ts_nanos_test"); + + // Create Beam Row with Timestamp.NANOS + Row row = + Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA) + .withFieldValue("ts_nanos", TEST_INSTANT) + .build(); + + // Write using Storage Write API with Beam Schema + Pipeline writePipeline = Pipeline.create(bqOptions); + writePipeline + .apply("CreateData", Create.of(row).withRowSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)) + .apply( + "WriteBeamRows", + BigQueryIO.write() + .to(tableSpec) + .useBeamSchema() // Key method for Beam Row! + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); + writePipeline.run().waitUntilFinish(); + + // Read back and verify + Pipeline readPipeline = Pipeline.create(bqOptions); + PCollection result = + readPipeline.apply( + "Read", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(DataFormat.AVRO) + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS) + .from(tableSpec)); + + PAssert.that(result) + .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + readPipeline.run().waitUntilFinish(); + } + private void runReadTest( TimestampPrecision precision, DataFormat format, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 76a492bebd2c..b50e8448698a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -69,6 +69,12 @@ /** Tests for {@link BigQueryUtils}. */ @RunWith(JUnit4.class) public class BigQueryUtilsTest { + private static final TableFieldSchema TIMESTAMP_NANOS = + new TableFieldSchema() + .setName("timestamp_nanos") + .setType(StandardSQLTypeName.TIMESTAMP.toString()) + .setTimestampPrecision(12L); + private static final Schema FLAT_TYPE = Schema.builder() .addNullableField("id", Schema.FieldType.INT64) @@ -98,6 +104,7 @@ public class BigQueryUtilsTest { .addNullableField("boolean", Schema.FieldType.BOOLEAN) .addNullableField("long", Schema.FieldType.INT64) .addNullableField("double", Schema.FieldType.DOUBLE) + .addNullableField("timestamp_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)) .build(); private static final Schema ENUM_TYPE = @@ -280,7 +287,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableFieldSchema ROWS = new TableFieldSchema() @@ -315,7 +323,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableFieldSchema MAP = new TableFieldSchema() @@ -368,7 +377,8 @@ public class BigQueryUtilsTest { new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP), true, 123L, - 123.456d) + 123.456d, + java.time.Instant.parse("2024-08-10T16:52:07.123456789Z")) .build(); private static final TableRow BQ_FLAT_ROW = @@ -404,13 +414,14 @@ public class BigQueryUtilsTest { .set("numeric", "123.456") .set("boolean", true) .set("long", 123L) - .set("double", 123.456d); + .set("double", 123.456d) + .set("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC"); private static final Row NULL_FLAT_ROW = Row.withSchema(FLAT_TYPE) .addValues( null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null, null, null, null, null, null) + null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(); private static final TableRow BQ_NULL_FLAT_ROW = @@ -441,7 +452,8 @@ public class BigQueryUtilsTest { .set("numeric", null) .set("boolean", null) .set("long", null) - .set("double", null); + .set("double", null) + .set("timestamp_nanos", null); private static final Row ENUM_ROW = Row.withSchema(ENUM_TYPE).addValues(new EnumerationType.Value(1)).build(); @@ -533,7 +545,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableSchema BQ_ENUM_TYPE = new TableSchema().setFields(Arrays.asList(COLOR)); @@ -593,7 +606,8 @@ public void testToTableSchema_flat() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -648,7 +662,8 @@ public void testToTableSchema_row() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -689,7 +704,8 @@ public void testToTableSchema_array_row() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -720,7 +736,7 @@ public void testToTableSchema_map_array() { public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -748,6 +764,7 @@ public void testToTableRow_flat() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test @@ -783,7 +800,7 @@ public void testToTableRow_row() { assertThat(row.size(), equalTo(1)); row = (TableRow) row.get("row"); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -811,6 +828,7 @@ public void testToTableRow_row() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test @@ -819,7 +837,7 @@ public void testToTableRow_array_row() { assertThat(row.size(), equalTo(1)); row = ((List) row.get("rows")).get(0); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -847,13 +865,14 @@ public void testToTableRow_array_row() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test public void testToTableRow_null_row() { TableRow row = toTableRow().apply(NULL_FLAT_ROW); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", null)); assertThat(row, hasEntry("value", null)); assertThat(row, hasEntry("name", null)); @@ -881,6 +900,7 @@ public void testToTableRow_null_row() { assertThat(row, hasEntry("boolean", null)); assertThat(row, hasEntry("long", null)); assertThat(row, hasEntry("double", null)); + assertThat(row, hasEntry("timestamp_nanos", null)); } private static final BigQueryUtils.ConversionOptions TRUNCATE_OPTIONS =