From 5b3d85b3e65ecea8a51b057e5ed6bbbd0b9b29d2 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Dec 2025 10:27:14 -0500 Subject: [PATCH 1/8] Beam row. --- .../bigquery/BeamRowToStorageApiProto.java | 47 ++++++++++++-- .../BeamRowToStorageApiProtoTest.java | 62 ++++++++++++++++++- 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index d7ca787feea3..80d8eccd84e3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -17,12 +17,15 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Int64Value; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Instant; @@ -44,6 +47,7 @@ import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; @@ -243,9 +247,21 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { if (logicalType == null) { throw new RuntimeException("Unexpected null logical type " + field.getType()); } - @Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier()); - if (type == null) { - throw new RuntimeException("Unsupported logical type " + field.getType()); + @Nullable TableFieldSchema.Type type; + if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { + int precision = checkNotNull(logicalType.getArgument()); + if (precision != 9) { + throw new RuntimeException( + "Unsupported precision for Timestamp logical type " + precision); + } + // Map Timestamp.NANOS logical type to BigQuery TIMESTAMP(12) for nanosecond precision + type = TableFieldSchema.Type.TIMESTAMP; + builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); + } else { + type = LOGICAL_TYPES.get(logicalType.getIdentifier()); + if (type == null) { + throw new RuntimeException("Unsupported logical type " + field.getType()); + } } builder = builder.setType(type); break; @@ -341,17 +357,38 @@ private static Object toProtoValue( fieldDescriptor.getMessageType(), keyType, valueType, entry)) .collect(Collectors.toList()); default: - return scalarToProtoValue(beamFieldType, value); + return scalarToProtoValue(fieldDescriptor, beamFieldType, value); } } + private static DynamicMessage buildTimestampPicosMessage( + Descriptor timestampPicosDescriptor, Instant instant) { + long seconds = instant.getEpochSecond(); + long picoseconds = instant.getNano() * 1000L; // nanos → picos + + return DynamicMessage.newBuilder(timestampPicosDescriptor) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")), + seconds) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")), + picoseconds) + .build(); + } + @VisibleForTesting - static Object scalarToProtoValue(FieldType beamFieldType, Object value) { + static Object scalarToProtoValue( + @Nullable FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) { if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) { @Nullable LogicalType logicalType = beamFieldType.getLogicalType(); if (logicalType == null) { throw new RuntimeException("Unexpectedly null logical type " + beamFieldType); } + if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { + Instant instant = (Instant) value; + Descriptor timestampPicosDescriptor = checkNotNull(fieldDescriptor).getMessageType(); + return buildTimestampPicosMessage(timestampPicosDescriptor, instant); + } @Nullable BiFunction, Object, Object> logicalTypeEncoder = LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index 94e33015a627..d7a88615a50b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos.DescriptorProto; import com.google.protobuf.DescriptorProtos.FieldDescriptorProto; @@ -47,6 +48,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -60,6 +62,13 @@ /** Unit tests form {@link BeamRowToStorageApiProto}. */ @RunWith(JUnit4.class) public class BeamRowToStorageApiProtoTest { + private static final java.time.Instant TEST_INSTANT_NANOS = + java.time.Instant.parse("2024-01-15T12:30:45.123456789Z"); + + private static final Schema TIMESTAMP_NANOS_SCHEMA = + Schema.builder() + .addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true)) + .build(); private static final EnumerationType TEST_ENUM = EnumerationType.create("ONE", "TWO", "RED", "BLUE"); private static final Schema BASE_SCHEMA = @@ -589,8 +598,59 @@ public void testScalarToProtoValue() { p -> { assertEquals( p.getValue(), - BeamRowToStorageApiProto.scalarToProtoValue(entry.getKey(), p.getKey())); + BeamRowToStorageApiProto.scalarToProtoValue(null, entry.getKey(), p.getKey())); }); } } + + @Test + public void testTimestampNanosSchema() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + TableFieldSchema field = protoSchema.getFields(0); + assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertEquals(12L, field.getTimestampPrecision().getValue()); + } + + @Test + public void testTimestampNanosDescriptor() throws Exception { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA), + true, + false); + + FieldDescriptorProto field = descriptor.getField(0); + assertEquals("timestampnanos", field.getName()); + assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals("TimestampPicos", field.getTypeName()); + } + + @Test + public void testTimestampNanosMessage() throws Exception { + Row row = + Row.withSchema(TIMESTAMP_NANOS_SCHEMA) + .withFieldValue("timestampNanos", TEST_INSTANT_NANOS) + .build(); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, row, null, -1); + + FieldDescriptor field = descriptor.findFieldByName("timestampnanos"); + DynamicMessage picos = (DynamicMessage) msg.getField(field); + Descriptor picosDesc = field.getMessageType(); + + assertEquals( + TEST_INSTANT_NANOS.getEpochSecond(), picos.getField(picosDesc.findFieldByName("seconds"))); + assertEquals( + TEST_INSTANT_NANOS.getNano() * 1000L, + picos.getField(picosDesc.findFieldByName("picoseconds"))); + } } From 62df257a6c88f4653f4cf3f53476a870e2150803 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Dec 2025 10:52:20 -0500 Subject: [PATCH 2/8] Avro --- .../AvroGenericRecordToStorageApiProto.java | 52 +++++++- ...vroGenericRecordToStorageApiProtoTest.java | 120 ++++++++++++++++++ 2 files changed, 170 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index fd9dffc260e7..cfb4672abd1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -23,6 +23,7 @@ import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Int64Value; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.Map; @@ -55,6 +56,8 @@ public class AvroGenericRecordToStorageApiProto { private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1); + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; + static final Map PRIMITIVE_TYPES = ImmutableMap.builder() .put(Schema.Type.INT, TableFieldSchema.Type.INT64) @@ -383,6 +386,11 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch break; default: elementType = TypeWithNullability.create(schema).getType(); + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) { + builder = builder.setType(TableFieldSchema.Type.TIMESTAMP); + builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); + break; + } Optional logicalType = Optional.ofNullable(LogicalTypes.fromSchema(elementType)); @Nullable @@ -476,7 +484,7 @@ private static Object toProtoValue( mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry)) .collect(Collectors.toList()); default: - return scalarToProtoValue(avroSchema, value); + return scalarToProtoValue(fieldDescriptor, avroSchema, value); } } @@ -502,10 +510,50 @@ static Object mapEntryToProtoValue( return builder.build(); } + /** Returns true if this schema represents a timestamp-nanos logical type. */ + private static boolean isTimestampNanos(Schema schema) { + return TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType")); + } + + private static DynamicMessage buildTimestampPicosMessage( + Descriptor timestampPicosDescriptor, long seconds, long picoseconds) { + return DynamicMessage.newBuilder(timestampPicosDescriptor) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")), + seconds) + .setField( + Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")), + picoseconds) + .build(); + } + @VisibleForTesting - static Object scalarToProtoValue(Schema fieldSchema, Object value) { + static Object scalarToProtoValue( + @Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) { TypeWithNullability type = TypeWithNullability.create(fieldSchema); + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) { + Preconditions.checkArgument( + value instanceof Long, "Expecting a value as Long type (timestamp-nanos)."); + long nanos = (Long) value; + + // Convert nanos since epoch to seconds + picoseconds + long seconds = nanos / 1_000_000_000L; + int nanoAdjustment = (int) (nanos % 1_000_000_000L); + + // Handle negative timestamps (before epoch) + if (nanos < 0 && nanoAdjustment != 0) { + seconds -= 1; + nanoAdjustment += 1_000_000_000; + } + + // Convert nanos to picos (multiply by 1000) + long picoseconds = nanoAdjustment * 1000L; + + return buildTimestampPicosMessage( + Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds); + } LogicalType logicalType = LogicalTypes.fromSchema(type.getType()); + if (logicalType != null) { @Nullable BiFunction logicalTypeEncoder = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index ecf49cd6d8bb..deabb1dd05fc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; import com.google.protobuf.ByteString; @@ -336,6 +337,19 @@ enum TestEnum { .noDefault() .endRecord(); + private static Schema createTimestampNanosSchema() { + Schema longSchema = Schema.create(Schema.Type.LONG); + longSchema.addProp("logicalType", "timestamp-nanos"); + return SchemaBuilder.record("TimestampNanosRecord") + .fields() + .name("timestampNanosValue") + .type(longSchema) + .noDefault() + .endRecord(); + } + + private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema(); + private static GenericRecord baseRecord; private static GenericRecord rawLogicalTypesRecord; private static GenericRecord jodaTimeLogicalTypesRecord; @@ -765,4 +779,110 @@ public void testMessageFromGenericRecordWithNullableArrayWithNullValue() throws List list = (List) msg.getField(fieldDescriptors.get("anullablearray")); assertEquals(Collections.emptyList(), list); } + + @Test + public void testDescriptorFromSchemaTimestampNanos() { + DescriptorProto descriptor = + TableRowToStorageApiProto.descriptorSchemaFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + assertEquals(1, descriptor.getFieldCount()); + FieldDescriptorProto field = descriptor.getField(0); + assertEquals("timestampnanosvalue", field.getName()); + assertEquals(Type.TYPE_MESSAGE, field.getType()); + assertEquals("TimestampPicos", field.getTypeName()); + + // Verify nested TimestampPicos type exists + assertEquals(1, descriptor.getNestedTypeCount()); + DescriptorProto nestedType = descriptor.getNestedType(0); + assertEquals("TimestampPicos", nestedType.getName()); + assertEquals(2, nestedType.getFieldCount()); + } + + @Test + public void testMessageFromGenericRecordTimestampNanos() throws Exception { + // 2024-01-15 12:30:45.123456789 → nanoseconds since epoch + // Seconds: 1705321845 + // Nanos: 123456789 + // Total nanos = 1705321845 * 1_000_000_000 + 123456789 = 1705321845123456789L + long nanosValue = 1705321845123456789L; + + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_SCHEMA) + .set("timestampNanosValue", nanosValue) + .build(); + + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, record, null, -1); + + assertEquals(1, msg.getAllFields().size()); + + // Get the TimestampPicos field + Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName("timestampnanosvalue"); + DynamicMessage timestampPicos = (DynamicMessage) msg.getField(timestampField); + + // Verify seconds and picoseconds + Descriptors.Descriptor picosDesc = timestampField.getMessageType(); + long seconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("seconds")); + long picoseconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("picoseconds")); + + assertEquals(1705321845L, seconds); + assertEquals(123456789L * 1000L, picoseconds); // 123456789000 picos + } + + @Test + public void testMessageFromGenericRecordTimestampNanosNegative() throws Exception { + // -0.5 seconds = -500_000_000 nanoseconds + long nanosValue = -500_000_000L; + + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_SCHEMA) + .set("timestampNanosValue", nanosValue) + .build(); + + Descriptors.Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + TIMESTAMP_NANOS_SCHEMA), + true, + false); + + DynamicMessage msg = + AvroGenericRecordToStorageApiProto.messageFromGenericRecord(descriptor, record, null, -1); + + Descriptors.FieldDescriptor timestampField = descriptor.findFieldByName("timestampnanosvalue"); + DynamicMessage timestampPicos = (DynamicMessage) msg.getField(timestampField); + + Descriptors.Descriptor picosDesc = timestampField.getMessageType(); + long seconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("seconds")); + long picoseconds = (Long) timestampPicos.getField(picosDesc.findFieldByName("picoseconds")); + + // -0.5s should be represented as {seconds: -1, picoseconds: 500_000_000_000} + assertEquals(-1L, seconds); + assertEquals(500_000_000_000L, picoseconds); // 500 million picos + } + + @Test + public void testProtoTableSchemaFromAvroSchemaTimestampNanos() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(TIMESTAMP_NANOS_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0); + assertEquals("timestampnanosvalue", field.getName()); + assertEquals( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertTrue(field.hasTimestampPrecision()); + assertEquals(12L, field.getTimestampPrecision().getValue()); + } } From 04cc3c46a01743d0c82002e1482ebe46e01c2d3b Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Dec 2025 05:40:34 +0000 Subject: [PATCH 3/8] fixes. --- .../AvroGenericRecordToStorageApiProto.java | 81 ++++++++++++------- .../io/gcp/bigquery/BigQueryAvroUtils.java | 2 +- ...geApiDynamicDestinationsGenericRecord.java | 2 + 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index cfb4672abd1f..628e444d5c79 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -260,6 +260,7 @@ public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) { for (Schema.Field field : schema.getFields()) { builder.addFields(fieldDescriptorFromAvroField(field)); } + System.out.println("CLAUDE protoTableSchemaFromAvroSchema " + builder.build()); return builder.build(); } @@ -317,6 +318,8 @@ public static DynamicMessage messageFromGenericRecord( @SuppressWarnings("nullness") private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) { @Nullable Schema schema = field.schema(); + System.out.println("CLANDE fieldDescriptorFromAvroField schema " + schema); + Preconditions.checkNotNull(schema, "Unexpected null schema!"); if (StorageApiCDC.COLUMNS.contains(field.name())) { throw new RuntimeException("Reserved field name " + field.name() + " in user schema."); @@ -383,39 +386,45 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch .setType(unionFieldSchema.getType()) .setMode(unionFieldSchema.getMode()) .addAllFields(unionFieldSchema.getFieldsList()); + + if (unionFieldSchema.hasTimestampPrecision()) { + builder.setTimestampPrecision(unionFieldSchema.getTimestampPrecision()); + } break; default: elementType = TypeWithNullability.create(schema).getType(); + System.out.println("CLAUDE fieldDescriptorFromAvroField elementType " + elementType); if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) { builder = builder.setType(TableFieldSchema.Type.TIMESTAMP); builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); break; - } - Optional logicalType = - Optional.ofNullable(LogicalTypes.fromSchema(elementType)); - @Nullable - TableFieldSchema.Type primitiveType = - logicalType - .flatMap(AvroGenericRecordToStorageApiProto::logicalTypes) - .orElse(PRIMITIVE_TYPES.get(elementType.getType())); - if (primitiveType == null) { - throw new RuntimeException("Unsupported type " + elementType.getType()); - } - // a scalar will be required by default, if defined as part of union then - // caller will set nullability requirements - builder = builder.setType(primitiveType); - // parametrized types - if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) { - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get(); - int precision = decimal.getPrecision(); - int scale = decimal.getScale(); - if (!(precision == 38 && scale == 9) // NUMERIC - && !(precision == 77 && scale == 38) // BIGNUMERIC - ) { - // parametrized type - builder = builder.setPrecision(precision); - if (scale != 0) { - builder = builder.setScale(scale); + } else { + Optional logicalType = + Optional.ofNullable(LogicalTypes.fromSchema(elementType)); + @Nullable + TableFieldSchema.Type primitiveType = + logicalType + .flatMap(AvroGenericRecordToStorageApiProto::logicalTypes) + .orElse(PRIMITIVE_TYPES.get(elementType.getType())); + if (primitiveType == null) { + throw new RuntimeException("Unsupported type " + elementType.getType()); + } + // a scalar will be required by default, if defined as part of union then + // caller will set nullability requirements + builder = builder.setType(primitiveType); + // parametrized types + if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get(); + int precision = decimal.getPrecision(); + int scale = decimal.getScale(); + if (!(precision == 38 && scale == 9) // NUMERIC + && !(precision == 77 && scale == 38) // BIGNUMERIC + ) { + // parametrized type + builder = builder.setPrecision(precision); + if (scale != 0) { + builder = builder.setScale(scale); + } } } } @@ -430,6 +439,7 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch if (field.doc() != null) { builder = builder.setDescription(field.doc()); } + System.out.println("CLAUDE builder.build " + builder.build()); return builder.build(); } @@ -531,7 +541,17 @@ private static DynamicMessage buildTimestampPicosMessage( static Object scalarToProtoValue( @Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) { TypeWithNullability type = TypeWithNullability.create(fieldSchema); + System.out.println( + "CLAUDE fieldSchema " + + fieldSchema + + " value " + + value + + " type.getType() " + + type.getType() + + " " + + TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))); if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) { + System.out.println("CLAUDE in TIMESTAMP_NANOS_LOGICAL_TYPE"); Preconditions.checkArgument( value instanceof Long, "Expecting a value as Long type (timestamp-nanos)."); long nanos = (Long) value; @@ -548,7 +568,14 @@ static Object scalarToProtoValue( // Convert nanos to picos (multiply by 1000) long picoseconds = nanoAdjustment * 1000L; - + System.out.println("CLAUDE before returning descriptor " + descriptor); + System.out.println( + "CLAUDE before returning descriptor " + + Preconditions.checkNotNull(descriptor).getMessageType().getName()); + System.out.println( + "CLAUDE Returning " + + buildTimestampPicosMessage( + Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds)); return buildTimestampPicosMessage( Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index b5243a8110b7..e1ff0f58f148 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -644,7 +644,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. if (useAvroLogicalTypes && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { - return fieldSchema.setType("TIMESTAMP"); + return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L); } if (logicalType instanceof LogicalTypes.TimeMicros) { return fieldSchema.setType("TIME"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index 56b4be4a1a1f..df9f7a405978 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -69,8 +69,10 @@ class GenericRecordConverter implements MessageConverter { avroSchema = schemaFactory.apply(getSchema(destination)); protoTableSchema = AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(avroSchema); + System.out.println("CLAUDE protoTableSchema " + protoTableSchema); descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true, false); + // System.out.println("CLAUDE descriptor " + descriptor); if (usesCdc) { cdcDescriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true, true); From 86e834ebe1e494edaef14bdbc30dc8d391bdbdac Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 6 Jan 2026 08:25:58 -0500 Subject: [PATCH 4/8] tests. --- .../AvroGenericRecordToStorageApiProto.java | 29 ----- .../bigquery/BeamRowToStorageApiProto.java | 10 +- ...geApiDynamicDestinationsGenericRecord.java | 2 - .../bigquery/BigQueryTimestampPicosIT.java | 113 ++++++++++++++++++ 4 files changed, 119 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 628e444d5c79..671dd480ff5d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -260,7 +260,6 @@ public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) { for (Schema.Field field : schema.getFields()) { builder.addFields(fieldDescriptorFromAvroField(field)); } - System.out.println("CLAUDE protoTableSchemaFromAvroSchema " + builder.build()); return builder.build(); } @@ -318,7 +317,6 @@ public static DynamicMessage messageFromGenericRecord( @SuppressWarnings("nullness") private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) { @Nullable Schema schema = field.schema(); - System.out.println("CLANDE fieldDescriptorFromAvroField schema " + schema); Preconditions.checkNotNull(schema, "Unexpected null schema!"); if (StorageApiCDC.COLUMNS.contains(field.name())) { @@ -393,7 +391,6 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch break; default: elementType = TypeWithNullability.create(schema).getType(); - System.out.println("CLAUDE fieldDescriptorFromAvroField elementType " + elementType); if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) { builder = builder.setType(TableFieldSchema.Type.TIMESTAMP); builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); @@ -439,7 +436,6 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch if (field.doc() != null) { builder = builder.setDescription(field.doc()); } - System.out.println("CLAUDE builder.build " + builder.build()); return builder.build(); } @@ -520,11 +516,6 @@ static Object mapEntryToProtoValue( return builder.build(); } - /** Returns true if this schema represents a timestamp-nanos logical type. */ - private static boolean isTimestampNanos(Schema schema) { - return TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType")); - } - private static DynamicMessage buildTimestampPicosMessage( Descriptor timestampPicosDescriptor, long seconds, long picoseconds) { return DynamicMessage.newBuilder(timestampPicosDescriptor) @@ -541,22 +532,11 @@ private static DynamicMessage buildTimestampPicosMessage( static Object scalarToProtoValue( @Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) { TypeWithNullability type = TypeWithNullability.create(fieldSchema); - System.out.println( - "CLAUDE fieldSchema " - + fieldSchema - + " value " - + value - + " type.getType() " - + type.getType() - + " " - + TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))); if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) { - System.out.println("CLAUDE in TIMESTAMP_NANOS_LOGICAL_TYPE"); Preconditions.checkArgument( value instanceof Long, "Expecting a value as Long type (timestamp-nanos)."); long nanos = (Long) value; - // Convert nanos since epoch to seconds + picoseconds long seconds = nanos / 1_000_000_000L; int nanoAdjustment = (int) (nanos % 1_000_000_000L); @@ -566,16 +546,7 @@ static Object scalarToProtoValue( nanoAdjustment += 1_000_000_000; } - // Convert nanos to picos (multiply by 1000) long picoseconds = nanoAdjustment * 1000L; - System.out.println("CLAUDE before returning descriptor " + descriptor); - System.out.println( - "CLAUDE before returning descriptor " - + Preconditions.checkNotNull(descriptor).getMessageType().getName()); - System.out.println( - "CLAUDE Returning " - + buildTimestampPicosMessage( - Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds)); return buildTimestampPicosMessage( Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index 80d8eccd84e3..adb8e4468c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.cloud.bigquery.storage.v1.TableFieldSchema; import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.ByteString; @@ -249,7 +247,10 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { } @Nullable TableFieldSchema.Type type; if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { - int precision = checkNotNull(logicalType.getArgument()); + int precision = + Preconditions.checkNotNull( + logicalType.getArgument(), + "Expected logical type argument for timestamp precision."); if (precision != 9) { throw new RuntimeException( "Unsupported precision for Timestamp logical type " + precision); @@ -386,7 +387,8 @@ static Object scalarToProtoValue( } if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) { Instant instant = (Instant) value; - Descriptor timestampPicosDescriptor = checkNotNull(fieldDescriptor).getMessageType(); + Descriptor timestampPicosDescriptor = + Preconditions.checkNotNull(fieldDescriptor).getMessageType(); return buildTimestampPicosMessage(timestampPicosDescriptor, instant); } @Nullable diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index df9f7a405978..56b4be4a1a1f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -69,10 +69,8 @@ class GenericRecordConverter implements MessageConverter { avroSchema = schemaFactory.apply(getSchema(destination)); protoTableSchema = AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(avroSchema); - System.out.println("CLAUDE protoTableSchema " + protoTableSchema); descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true, false); - // System.out.println("CLAUDE descriptor " + descriptor); if (usesCdc) { cdcDescriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(protoTableSchema, true, true); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 5deffd4028f9..3b4a0788d482 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -23,13 +23,19 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import java.security.SecureRandom; import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -392,6 +398,113 @@ public void testReadWithNanosPrecision_Arrow() { runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput, simpleTableSpec); } + // Schema with custom timestamp-nanos logical type + private static org.apache.avro.Schema createTimestampNanosAvroSchema() { + org.apache.avro.Schema longSchema = + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); + longSchema.addProp("logicalType", "timestamp-nanos"); + return org.apache.avro.SchemaBuilder.record("TimestampNanosRecord") + .fields() + .name("ts_nanos") + .type(longSchema) + .noDefault() + .endRecord(); + } + + private static final org.apache.avro.Schema TIMESTAMP_NANOS_AVRO_SCHEMA = + createTimestampNanosAvroSchema(); + + // Nanoseconds since epoch for 2024-01-15T10:30:45.123456789Z + private static final long TEST_NANOS_VALUE = 1705321845123456789L; + + @Test + public void testWriteGenericRecordTimestampNanos() throws Exception { + String tableSpec = + String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test"); + + // Create GenericRecord with timestamp-nanos value + GenericRecord record = + new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) + .set("ts_nanos", TEST_NANOS_VALUE) + .build(); + + // Write using Storage Write API with Avro format + Pipeline writePipeline = Pipeline.create(bqOptions); + writePipeline + .apply("CreateData", Create.of(record).withCoder(AvroCoder.of(TIMESTAMP_NANOS_AVRO_SCHEMA))) + .apply( + "WriteGenericRecords", + BigQueryIO.writeGenericRecords() + .to(tableSpec) + .withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA) + .useAvroLogicalTypes() + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + writePipeline.run().waitUntilFinish(); + + // Read back and verify + Pipeline readPipeline = Pipeline.create(bqOptions); + PCollection result = + readPipeline.apply( + "Read", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(DataFormat.AVRO) + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS) + .from(tableSpec)); + + PAssert.that(result) + .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + readPipeline.run().waitUntilFinish(); + } + + private static final java.time.Instant TEST_INSTANT = + java.time.Instant.parse("2024-01-15T10:30:45.123456789Z"); + + private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA = + Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build(); + + @Test + public void testWriteBeamRowTimestampNanos() throws Exception { + String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "beam_row_ts_nanos_test"); + + // Create Beam Row with Timestamp.NANOS + Row row = + Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA) + .withFieldValue("ts_nanos", TEST_INSTANT) + .build(); + + // Write using Storage Write API with Beam Schema + Pipeline writePipeline = Pipeline.create(bqOptions); + writePipeline + .apply("CreateData", Create.of(row).withRowSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)) + .apply( + "WriteBeamRows", + BigQueryIO.write() + .to(tableSpec) + .useBeamSchema() // Key method for Beam Row! + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + writePipeline.run().waitUntilFinish(); + + // Read back and verify + Pipeline readPipeline = Pipeline.create(bqOptions); + PCollection result = + readPipeline.apply( + "Read", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(DataFormat.AVRO) + .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS) + .from(tableSpec)); + + PAssert.that(result) + .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + readPipeline.run().waitUntilFinish(); + } + private void runReadTest( TimestampPrecision precision, DataFormat format, From bf22f82bf5c48ffb9609a268854b425a9a2439cc Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 6 Jan 2026 14:26:36 +0000 Subject: [PATCH 5/8] fix tests. --- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 16 +++++++++++++++- .../gcp/bigquery/BigQueryTimestampPicosIT.java | 16 +++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 948987871273..30b7cdc6d120 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -587,7 +587,21 @@ private static List toTableFieldSchema(Schema schema) { field.setFields(toTableFieldSchema(mapSchema)); field.setMode(Mode.REPEATED.toString()); } - field.setType(toStandardSQLTypeName(type).toString()); + if (type.getTypeName().isLogicalType() + && Preconditions.checkArgumentNotNull(type.getLogicalType()) + .getIdentifier() + .equals(Timestamp.IDENTIFIER)) { + Schema.LogicalType logicalType = + Preconditions.checkArgumentNotNull(type.getLogicalType()); + int precision = Preconditions.checkArgumentNotNull(logicalType.getArgument()); + if (precision != 9) { + throw new IllegalArgumentException( + "Unsupported precision for Timestamp logical type " + precision); + } + field.setType(StandardSQLTypeName.TIMESTAMP.toString()).setTimestampPrecision(12L); + } else { + field.setType(toStandardSQLTypeName(type).toString()); + } fields.add(field); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 3b4a0788d482..ac9e3537092a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -411,12 +411,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() { .endRecord(); } + private static final java.time.Instant TEST_INSTANT = + java.time.Instant.parse("2024-01-15T10:30:45.123456789Z"); + private static final org.apache.avro.Schema TIMESTAMP_NANOS_AVRO_SCHEMA = createTimestampNanosAvroSchema(); - // Nanoseconds since epoch for 2024-01-15T10:30:45.123456789Z - private static final long TEST_NANOS_VALUE = 1705321845123456789L; - @Test public void testWriteGenericRecordTimestampNanos() throws Exception { String tableSpec = @@ -425,7 +425,7 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { // Create GenericRecord with timestamp-nanos value GenericRecord record = new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) - .set("ts_nanos", TEST_NANOS_VALUE) + .set("ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000 + TEST_INSTANT.getNano()) .build(); // Write using Storage Write API with Avro format @@ -437,10 +437,11 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { BigQueryIO.writeGenericRecords() .to(tableSpec) .withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA) + .withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true)) .useAvroLogicalTypes() .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); writePipeline.run().waitUntilFinish(); // Read back and verify @@ -459,9 +460,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { readPipeline.run().waitUntilFinish(); } - private static final java.time.Instant TEST_INSTANT = - java.time.Instant.parse("2024-01-15T10:30:45.123456789Z"); - private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA = Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build(); @@ -486,7 +484,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception { .useBeamSchema() // Key method for Beam Row! .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); writePipeline.run().waitUntilFinish(); // Read back and verify From 36c0e0219c433f3a4885c4806ca61dfb32395731 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 6 Jan 2026 16:08:37 +0000 Subject: [PATCH 6/8] Update tests. --- .../io/gcp/bigquery/BigQueryUtilsTest.java | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 76a492bebd2c..b50e8448698a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -69,6 +69,12 @@ /** Tests for {@link BigQueryUtils}. */ @RunWith(JUnit4.class) public class BigQueryUtilsTest { + private static final TableFieldSchema TIMESTAMP_NANOS = + new TableFieldSchema() + .setName("timestamp_nanos") + .setType(StandardSQLTypeName.TIMESTAMP.toString()) + .setTimestampPrecision(12L); + private static final Schema FLAT_TYPE = Schema.builder() .addNullableField("id", Schema.FieldType.INT64) @@ -98,6 +104,7 @@ public class BigQueryUtilsTest { .addNullableField("boolean", Schema.FieldType.BOOLEAN) .addNullableField("long", Schema.FieldType.INT64) .addNullableField("double", Schema.FieldType.DOUBLE) + .addNullableField("timestamp_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)) .build(); private static final Schema ENUM_TYPE = @@ -280,7 +287,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableFieldSchema ROWS = new TableFieldSchema() @@ -315,7 +323,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableFieldSchema MAP = new TableFieldSchema() @@ -368,7 +377,8 @@ public class BigQueryUtilsTest { new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP), true, 123L, - 123.456d) + 123.456d, + java.time.Instant.parse("2024-08-10T16:52:07.123456789Z")) .build(); private static final TableRow BQ_FLAT_ROW = @@ -404,13 +414,14 @@ public class BigQueryUtilsTest { .set("numeric", "123.456") .set("boolean", true) .set("long", 123L) - .set("double", 123.456d); + .set("double", 123.456d) + .set("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC"); private static final Row NULL_FLAT_ROW = Row.withSchema(FLAT_TYPE) .addValues( null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null, null, null, null, null, null) + null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(); private static final TableRow BQ_NULL_FLAT_ROW = @@ -441,7 +452,8 @@ public class BigQueryUtilsTest { .set("numeric", null) .set("boolean", null) .set("long", null) - .set("double", null); + .set("double", null) + .set("timestamp_nanos", null); private static final Row ENUM_ROW = Row.withSchema(ENUM_TYPE).addValues(new EnumerationType.Value(1)).build(); @@ -533,7 +545,8 @@ public class BigQueryUtilsTest { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); private static final TableSchema BQ_ENUM_TYPE = new TableSchema().setFields(Arrays.asList(COLOR)); @@ -593,7 +606,8 @@ public void testToTableSchema_flat() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -648,7 +662,8 @@ public void testToTableSchema_row() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -689,7 +704,8 @@ public void testToTableSchema_array_row() { NUMERIC, BOOLEAN, LONG, - DOUBLE)); + DOUBLE, + TIMESTAMP_NANOS)); } @Test @@ -720,7 +736,7 @@ public void testToTableSchema_map_array() { public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -748,6 +764,7 @@ public void testToTableRow_flat() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test @@ -783,7 +800,7 @@ public void testToTableRow_row() { assertThat(row.size(), equalTo(1)); row = (TableRow) row.get("row"); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -811,6 +828,7 @@ public void testToTableRow_row() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test @@ -819,7 +837,7 @@ public void testToTableRow_array_row() { assertThat(row.size(), equalTo(1)); row = ((List) row.get("rows")).get(0); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", "123")); assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("timestamp_variant1", "2019-08-16 13:52:07.000 UTC")); @@ -847,13 +865,14 @@ public void testToTableRow_array_row() { assertThat(row, hasEntry("boolean", "true")); assertThat(row, hasEntry("long", "123")); assertThat(row, hasEntry("double", "123.456")); + assertThat(row, hasEntry("timestamp_nanos", "2024-08-10 16:52:07.123456789 UTC")); } @Test public void testToTableRow_null_row() { TableRow row = toTableRow().apply(NULL_FLAT_ROW); - assertThat(row.size(), equalTo(27)); + assertThat(row.size(), equalTo(28)); assertThat(row, hasEntry("id", null)); assertThat(row, hasEntry("value", null)); assertThat(row, hasEntry("name", null)); @@ -881,6 +900,7 @@ public void testToTableRow_null_row() { assertThat(row, hasEntry("boolean", null)); assertThat(row, hasEntry("long", null)); assertThat(row, hasEntry("double", null)); + assertThat(row, hasEntry("timestamp_nanos", null)); } private static final BigQueryUtils.ConversionOptions TRUNCATE_OPTIONS = From f6913ad7e3cccc8d11063d1d516108fe231a0607 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 6 Jan 2026 16:40:41 +0000 Subject: [PATCH 7/8] comments. --- .../AvroGenericRecordToStorageApiProto.java | 14 +++++++++----- .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 8 ++------ .../io/gcp/bigquery/BigQueryTimestampPicosIT.java | 3 ++- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 671dd480ff5d..76174ac3d04d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -57,6 +57,9 @@ public class AvroGenericRecordToStorageApiProto { private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1); private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; + private static final long PICOSECOND_PRECISION = 12L; + private static final long NANOS_PER_SECOND = 1_000_000_000L; + private static final long PICOS_PER_NANO = 1000L; static final Map PRIMITIVE_TYPES = ImmutableMap.builder() @@ -393,7 +396,8 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch elementType = TypeWithNullability.create(schema).getType(); if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) { builder = builder.setType(TableFieldSchema.Type.TIMESTAMP); - builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build()); + builder.setTimestampPrecision( + Int64Value.newBuilder().setValue(PICOSECOND_PRECISION).build()); break; } else { Optional logicalType = @@ -537,16 +541,16 @@ static Object scalarToProtoValue( value instanceof Long, "Expecting a value as Long type (timestamp-nanos)."); long nanos = (Long) value; - long seconds = nanos / 1_000_000_000L; - int nanoAdjustment = (int) (nanos % 1_000_000_000L); + long seconds = nanos / NANOS_PER_SECOND; + long nanoAdjustment = nanos % NANOS_PER_SECOND; // Handle negative timestamps (before epoch) if (nanos < 0 && nanoAdjustment != 0) { seconds -= 1; - nanoAdjustment += 1_000_000_000; + nanoAdjustment += NANOS_PER_SECOND; } - long picoseconds = nanoAdjustment * 1000L; + long picoseconds = nanoAdjustment * PICOS_PER_NANO; return buildTimestampPicosMessage( Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 30b7cdc6d120..16dbc2b5f186 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -587,12 +587,8 @@ private static List toTableFieldSchema(Schema schema) { field.setFields(toTableFieldSchema(mapSchema)); field.setMode(Mode.REPEATED.toString()); } - if (type.getTypeName().isLogicalType() - && Preconditions.checkArgumentNotNull(type.getLogicalType()) - .getIdentifier() - .equals(Timestamp.IDENTIFIER)) { - Schema.LogicalType logicalType = - Preconditions.checkArgumentNotNull(type.getLogicalType()); + Schema.LogicalType logicalType = type.getLogicalType(); + if (logicalType != null && Timestamp.IDENTIFIER.equals(logicalType.getIdentifier())) { int precision = Preconditions.checkArgumentNotNull(logicalType.getArgument()); if (precision != 9) { throw new IllegalArgumentException( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index ac9e3537092a..6d155185ee62 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -425,7 +425,8 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { // Create GenericRecord with timestamp-nanos value GenericRecord record = new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) - .set("ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000 + TEST_INSTANT.getNano()) + .set( + "ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano()) .build(); // Write using Storage Write API with Avro format From cd3ccf62121a3b035a27a09cd5112b09aa71c72a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 6 Jan 2026 19:13:02 +0000 Subject: [PATCH 8/8] fix test. --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index e95e15465966..ce6d53af4003 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -953,7 +953,8 @@ public void testConvertAvroSchemaToBigQuerySchema() { String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; Schema timestampType = new Schema.Parser().parse(timestampNanosJson); Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault()); - TableSchema expected = tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED")); + TableSchema expected = + tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED").setTimestampPrecision(12L)); TableSchema expectedRaw = tableSchema(f -> f.setType("INTEGER").setMode("REQUIRED")); assertEquals(expected, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema));