5151import com .google .common .collect .ImmutableList ;
5252import com .google .common .util .concurrent .MoreExecutors ;
5353import com .google .protobuf .ByteString ;
54+ import com .google .protobuf .DescriptorProtos ;
5455import com .google .protobuf .DescriptorProtos .DescriptorProto ;
5556import com .google .protobuf .DescriptorProtos .FieldDescriptorProto ;
5657import com .google .protobuf .Descriptors ;
5758import com .google .protobuf .Descriptors .DescriptorValidationException ;
59+ import com .google .protobuf .DynamicMessage ;
5860import com .google .protobuf .Int64Value ;
61+ import com .google .protobuf .Message ;
5962import io .grpc .Status ;
6063import io .grpc .Status .Code ;
6164import java .io .ByteArrayOutputStream ;
@@ -124,7 +127,7 @@ public class ITBigQueryStorageWriteClientTest {
124127
125128 // Arrow is a bit special in that timestamps are limited to nanoseconds precision.
126129 // The data will be padded to fit into the higher precision columns.
127- public static final Object [][] INPUT_ARROW_WRITE_TIMESTAMPS =
130+ private static final Object [][] INPUT_ARROW_WRITE_TIMESTAMPS =
128131 new Object [][] {
129132 {1735734896123456L /* 2025-01-01T12:34:56.123456Z */ , 1735734896123456789L },
130133 {1580646896123456L /* 2020-02-02T12:34:56.123456Z */ , 1580646896123456789L },
@@ -134,14 +137,35 @@ public class ITBigQueryStorageWriteClientTest {
134137
135138 // Arrow's higher precision column is padded with extra 0's if configured to return
136139 // ISO as output for any picosecond enabled column.
137- public static final Object [][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT =
140+ private static final Object [][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT =
138141 new Object [][] {
139142 {1735734896123456L /* 2025-01-01T12:34:56.123456Z */ , "2025-01-01T12:34:56.123456789000Z" },
140143 {1580646896123456L /* 2020-02-02T12:34:56.123456Z */ , "2020-02-02T12:34:56.123456789000Z" },
141144 {636467696123456L /* 1990-03-03T12:34:56.123456Z */ , "1990-03-03T12:34:56.123456789000Z" },
142145 {165846896123456L /* 1975-04-04T12:34:56.123456Z */ , "1975-04-04T12:34:56.123456789000Z" }
143146 };
144147
148+ // Special case where users can use the Write API with Protobuf messages
149+ // The format is two fields: 1. Seconds from epoch and 2. Subsecond fractional (millis, micros,
150+ // nano, or pico). This test case is using picos sub-second fractional
151+ private static final Long [][] INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS =
152+ new Long [][] {
153+ {1735734896L , 123456789123L }, /* 2025-01-01T12:34:56.123456789123Z */
154+ {1580646896L , 123456789123L }, /* 2020-02-02T12:34:56.123456789123Z */
155+ {636467696L , 123456789123L }, /* 1990-03-03T12:34:56.123456789123Z */
156+ {165846896L , 123456789123L } /* 1975-04-04T12:34:56.123456789123Z */
157+ };
158+
159+ // Expected ISO8601 output when using proto descriptors to write to BQ with pico precision
160+ private static final String []
161+ EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT =
162+ new String [] {
163+ "2025-01-01T12:34:56.123456789123Z" ,
164+ "2020-02-02T12:34:56.123456789123Z" ,
165+ "1990-03-03T12:34:56.123456789123Z" ,
166+ "1975-04-04T12:34:56.123456789123Z"
167+ };
168+
145169 public static class StringWithSecondsNanos {
146170 public String foo ;
147171 public long seconds ;
@@ -2368,7 +2392,7 @@ public void timestamp_arrowWrite() throws IOException {
23682392 @ Test
23692393 public void timestamp_protobufWrite ()
23702394 throws IOException , DescriptorValidationException , InterruptedException {
2371- String tableName = "bqstorage_timestamp_write_protobuf " ;
2395+ String tableName = "bqstorage_timestamp_write_protobuf_schema_aware " ;
23722396 // Opt to create a new table to write to instead of re-using table to prevent
23732397 // the test from failing due to any issues with deleting data after test.
23742398 // Increases the test time duration, but would be more resilient to transient
@@ -2417,6 +2441,130 @@ public void timestamp_protobufWrite()
24172441 assertTimestamps (tableName , EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT );
24182442 }
24192443
2444+ // Tests that users can use a Protobuf message that contains second a fractional
2445+ // part (pico) to be written to BQ
2446+ @ Test
2447+ public void timestamp_protobufWrite_customMessage_higherPrecision ()
2448+ throws IOException , DescriptorValidationException {
2449+ String tableName = "bqstorage_timestamp_write_protobuf_custom_descriptor" ;
2450+ // Opt to create a new table to write to instead of re-using table to prevent
2451+ // the test from failing due to any issues with deleting data after test.
2452+ // Increases the test time duration, but would be more resilient to transient
2453+ // failures
2454+ createTimestampTable (tableName );
2455+
2456+ /*
2457+ A sample protobuf format:
2458+ message Wrapper {
2459+ message TimestampPicos {
2460+ int64 seconds = 1;
2461+ int64 picoseconds = 2;
2462+ }
2463+ Wrapper timestampHigherPrecision = 1;
2464+ // ...
2465+ }
2466+ */
2467+ String wrapperProtoName = "Wrapper" ;
2468+ String timestampPicosProtoName = "TimestampPicos" ;
2469+ String secondsProtoName = "seconds" ;
2470+ String picosProtoName = "picoseconds" ;
2471+ DescriptorProto timestampPicosDescriptor =
2472+ DescriptorProto .newBuilder ()
2473+ .setName (timestampPicosProtoName )
2474+ .addField (
2475+ DescriptorProtos .FieldDescriptorProto .newBuilder ()
2476+ .setName (secondsProtoName )
2477+ .setNumber (1 )
2478+ .setType (DescriptorProtos .FieldDescriptorProto .Type .TYPE_INT64 )
2479+ .build ())
2480+ .addField (
2481+ DescriptorProtos .FieldDescriptorProto .newBuilder ()
2482+ .setName (picosProtoName )
2483+ .setNumber (2 )
2484+ .setType (DescriptorProtos .FieldDescriptorProto .Type .TYPE_INT64 )
2485+ .build ())
2486+ .build ();
2487+ DescriptorProto wrapperDescriptor =
2488+ DescriptorProto .newBuilder ()
2489+ .setName (wrapperProtoName ) // random name
2490+ .addField (
2491+ DescriptorProtos .FieldDescriptorProto .newBuilder ()
2492+ .setName (TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME )
2493+ .setNumber (3 )
2494+ .setType (DescriptorProtos .FieldDescriptorProto .Type .TYPE_MESSAGE )
2495+ .setTypeName (timestampPicosDescriptor .getName ())
2496+ .build ())
2497+ .addNestedType (timestampPicosDescriptor )
2498+ .build ();
2499+ ProtoSchema protoSchema =
2500+ ProtoSchema .newBuilder ().setProtoDescriptor (wrapperDescriptor ).build ();
2501+
2502+ TableName parent = TableName .of (ServiceOptions .getDefaultProjectId (), DATASET , tableName );
2503+ try (StreamWriter streamWriter =
2504+ StreamWriter .newBuilder (parent .toString () + "/_default" , writeClient )
2505+ .setWriterSchema (protoSchema )
2506+ .build ()) {
2507+ DescriptorProtos .FileDescriptorProto fileProto =
2508+ DescriptorProtos .FileDescriptorProto .newBuilder ()
2509+ .setName ("test.proto" ) // dummy proto file
2510+ .addMessageType (wrapperDescriptor )
2511+ .build ();
2512+
2513+ // Build the runtime descriptor (resolves types and names)
2514+ Descriptors .FileDescriptor file =
2515+ Descriptors .FileDescriptor .buildFrom (fileProto , new Descriptors .FileDescriptor [] {});
2516+
2517+ // Get the handle to the "wrapper" message type
2518+ Descriptors .Descriptor descriptor = file .findMessageTypeByName (wrapperProtoName );
2519+
2520+ ProtoRows .Builder rowsBuilder = ProtoRows .newBuilder ();
2521+ for (Long [] timestampParts : INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS ) {
2522+ Message message =
2523+ DynamicMessage .newBuilder (descriptor )
2524+ .setField (
2525+ descriptor .findFieldByName (TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME ),
2526+ DynamicMessage .newBuilder (
2527+ descriptor .findNestedTypeByName (timestampPicosProtoName ))
2528+ .setField (
2529+ descriptor
2530+ .findNestedTypeByName (timestampPicosProtoName )
2531+ .findFieldByName (secondsProtoName ),
2532+ timestampParts [0 ])
2533+ .setField (
2534+ descriptor
2535+ .findNestedTypeByName (timestampPicosProtoName )
2536+ .findFieldByName (picosProtoName ),
2537+ timestampParts [1 ])
2538+ .build ())
2539+ .build ();
2540+ rowsBuilder .addSerializedRows (message .toByteString ());
2541+ }
2542+ ApiFuture <AppendRowsResponse > future = streamWriter .append (rowsBuilder .build ());
2543+ ApiFutures .addCallback (
2544+ future , new Helper .AppendCompleteCallback (), MoreExecutors .directExecutor ());
2545+ }
2546+ String table =
2547+ BigQueryResource .formatTableResource (
2548+ ServiceOptions .getDefaultProjectId (), DATASET , tableName );
2549+
2550+ // Read all the data as Avro GenericRecords
2551+ List <GenericData .Record > rows = Helper .readAllRows (readClient , parentProjectId , table , null );
2552+ List <String > timestampHigherPrecision =
2553+ rows .stream ()
2554+ .map (x -> x .get (TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME ).toString ())
2555+ .collect (Collectors .toList ());
2556+ assertEquals (
2557+ EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT .length ,
2558+ timestampHigherPrecision .size ());
2559+ for (int i = 0 ;
2560+ i < EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT .length ;
2561+ i ++) {
2562+ assertEquals (
2563+ EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT [i ],
2564+ timestampHigherPrecision .get (i ));
2565+ }
2566+ }
2567+
24202568 private void createTimestampTable (String tableName ) {
24212569 Schema bqTableSchema =
24222570 Schema .of (
0 commit comments