Skip to content

Commit b75e8f0

Browse files
committed
chore: Add comments and refactor constants
1 parent 50c329e commit b75e8f0

File tree

3 files changed

+75
-166
lines changed

3 files changed

+75
-166
lines changed

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java

Lines changed: 17 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import com.google.common.util.concurrent.MoreExecutors;
8282
import com.google.protobuf.Descriptors.DescriptorValidationException;
8383
import com.google.protobuf.Int64Value;
84-
import com.google.protobuf.Timestamp;
8584
import io.opentelemetry.api.OpenTelemetry;
8685
import io.opentelemetry.api.common.AttributeKey;
8786
import io.opentelemetry.sdk.OpenTelemetrySdk;
@@ -114,7 +113,6 @@
114113
import org.apache.avro.LogicalTypes;
115114
import org.apache.avro.Schema;
116115
import org.apache.avro.generic.GenericData;
117-
import org.apache.avro.generic.GenericRecordBuilder;
118116
import org.apache.avro.util.Utf8;
119117
import org.json.JSONArray;
120118
import org.json.JSONObject;
@@ -530,6 +528,7 @@ public static void beforeClass()
530528

531529
private static void setupTimestampTable()
532530
throws DescriptorValidationException, IOException, InterruptedException {
531+
// Schema to create a BQ table
533532
com.google.cloud.bigquery.Schema timestampSchema =
534533
com.google.cloud.bigquery.Schema.of(
535534
Field.newBuilder(TIMESTAMP_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
@@ -540,6 +539,13 @@ private static void setupTimestampTable()
540539
.setMode(Mode.NULLABLE)
541540
.build());
542541

542+
// Create BQ table with timestamps
543+
TableId tableId = TableId.of(DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE);
544+
bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(timestampSchema)));
545+
546+
TableName parentTable = TableName.of(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE);
547+
548+
// Define the BQStorage schema to write to
543549
TableSchema timestampTableSchema =
544550
TableSchema.newBuilder()
545551
.addFields(
@@ -557,23 +563,8 @@ private static void setupTimestampTable()
557563
.build())
558564
.build();
559565

560-
// Create table with Range fields.
561-
TableId tableId = TableId.of(DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE);
562-
bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(timestampSchema)));
563-
564-
TableName parentTable = TableName.of(projectName, DATASET, BQSTORAGE_TIMESTAMP_READ_TABLE);
565-
RetrySettings retrySettings =
566-
RetrySettings.newBuilder()
567-
.setInitialRetryDelayDuration(Duration.ofMillis(500))
568-
.setRetryDelayMultiplier(1.1)
569-
.setMaxAttempts(5)
570-
.setMaxRetryDelayDuration(Duration.ofSeconds(10))
571-
.build();
572-
573566
try (JsonStreamWriter writer =
574-
JsonStreamWriter.newBuilder(parentTable.toString(), timestampTableSchema)
575-
.setRetrySettings(retrySettings)
576-
.build()) {
567+
JsonStreamWriter.newBuilder(parentTable.toString(), timestampTableSchema).build()) {
577568
JSONArray data = new JSONArray();
578569
for (Object[] timestampData : Helper.INPUT_TIMESTAMPS) {
579570
JSONObject row = new JSONObject();
@@ -874,6 +865,7 @@ public void testRangeTypeWrite()
874865
}
875866
}
876867

868+
// Tests that inputs for micros and picos can be read properly via Arrow
877869
@Test
878870
public void timestamp_readArrow() throws IOException {
879871
String table =
@@ -925,13 +917,15 @@ public void timestamp_readArrow() throws IOException {
925917
Preconditions.checkState(response.hasArrowRecordBatch());
926918
reader.processRows(
927919
response.getArrowRecordBatch(),
928-
new SimpleRowReaderArrow.ArrowTimestampBatchConsumer(Helper.INPUT_TIMESTAMPS));
920+
new SimpleRowReaderArrow.ArrowTimestampBatchConsumer(
921+
Helper.EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT));
929922
rowCount += response.getRowCount();
930923
}
931-
assertEquals(Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION.length, rowCount);
924+
assertEquals(Helper.EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT.length, rowCount);
932925
}
933926
}
934927

928+
// Tests that inputs for micros and picos can be read properly via Avro
935929
@Test
936930
public void timestamp_readAvro() throws IOException {
937931
String table =
@@ -944,11 +938,12 @@ public void timestamp_readAvro() throws IOException {
944938
.map(x -> x.get(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME).toString())
945939
.collect(Collectors.toList());
946940
for (int i = 0; i < timestamps.size(); i++) {
947-
assertEquals(Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION[i][0], timestamps.get(i));
941+
assertEquals(Helper.EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT[i][0], timestamps.get(i));
948942
}
949943
for (int i = 0; i < timestampHigherPrecision.size(); i++) {
950944
assertEquals(
951-
Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION[i][1], timestampHigherPrecision.get(i));
945+
Helper.EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT[i][1],
946+
timestampHigherPrecision.get(i));
952947
}
953948
}
954949

@@ -1887,90 +1882,6 @@ private long readStreamToOffset(ReadStream readStream, long rowOffset) {
18871882
return rowCount;
18881883
}
18891884

1890-
/**
1891-
* Reads all the rows from the specified table.
1892-
*
1893-
* <p>For every row, the consumer is called for processing.
1894-
*
1895-
* @param table
1896-
* @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
1897-
* @param filter Optional. If specified, it will be used to restrict returned data.
1898-
* @param consumer that receives all Avro rows.
1899-
* @throws IOException
1900-
*/
1901-
private void processRowsAtSnapshot(
1902-
String table, Long snapshotInMillis, String filter, AvroRowConsumer consumer)
1903-
throws IOException {
1904-
Preconditions.checkNotNull(table);
1905-
Preconditions.checkNotNull(consumer);
1906-
1907-
CreateReadSessionRequest.Builder createSessionRequestBuilder =
1908-
CreateReadSessionRequest.newBuilder()
1909-
.setParent(parentProjectId)
1910-
.setMaxStreamCount(1)
1911-
.setReadSession(
1912-
ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build());
1913-
1914-
if (snapshotInMillis != null) {
1915-
Timestamp snapshotTimestamp =
1916-
Timestamp.newBuilder()
1917-
.setSeconds(snapshotInMillis / 1_000)
1918-
.setNanos((int) ((snapshotInMillis % 1000) * 1000000))
1919-
.build();
1920-
createSessionRequestBuilder
1921-
.getReadSessionBuilder()
1922-
.setTableModifiers(
1923-
ReadSession.TableModifiers.newBuilder().setSnapshotTime(snapshotTimestamp).build());
1924-
}
1925-
1926-
if (filter != null && !filter.isEmpty()) {
1927-
createSessionRequestBuilder
1928-
.getReadSessionBuilder()
1929-
.setReadOptions(TableReadOptions.newBuilder().setRowRestriction(filter).build());
1930-
}
1931-
1932-
ReadSession session = readClient.createReadSession(createSessionRequestBuilder.build());
1933-
assertEquals(
1934-
String.format(
1935-
"Did not receive expected number of streams for table '%s' CreateReadSession"
1936-
+ " response:%n%s",
1937-
table, session.toString()),
1938-
1,
1939-
session.getStreamsCount());
1940-
1941-
ReadRowsRequest readRowsRequest =
1942-
ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();
1943-
1944-
SimpleRowReaderAvro reader =
1945-
new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
1946-
1947-
ServerStream<ReadRowsResponse> stream = readClient.readRowsCallable().call(readRowsRequest);
1948-
for (ReadRowsResponse response : stream) {
1949-
reader.processRows(response.getAvroRows(), consumer);
1950-
}
1951-
}
1952-
1953-
/**
1954-
* Reads all the rows from the specified table and returns a list as generic Avro records.
1955-
*
1956-
* @param table
1957-
* @param filter Optional. If specified, it will be used to restrict returned data.
1958-
* @return
1959-
*/
1960-
List<GenericData.Record> readAllRows(String table, String filter) throws IOException {
1961-
final List<GenericData.Record> rows = new ArrayList<>();
1962-
processRowsAtSnapshot(
1963-
/* table= */ table,
1964-
/* snapshotInMillis= */ null,
1965-
/* filter= */ filter,
1966-
(AvroRowConsumer)
1967-
record -> {
1968-
// clone the record since that reference will be reused by the reader.
1969-
rows.add(new GenericRecordBuilder(record).build());
1970-
});
1971-
return rows;
1972-
}
1973-
19741885
/**
19751886
* Runs a query job with WRITE_APPEND disposition to the destination table and returns the
19761887
* successfully completed job.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java

Lines changed: 55 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.bigquery.storage.v1.it;
1818

19+
import static com.google.cloud.bigquery.storage.v1.it.util.Helper.EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT;
1920
import static com.google.cloud.bigquery.storage.v1.it.util.Helper.TIMESTAMP_COLUMN_NAME;
2021
import static com.google.cloud.bigquery.storage.v1.it.util.Helper.TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME;
2122
import static com.google.common.truth.Truth.assertThat;
@@ -131,8 +132,9 @@ public class ITBigQueryStorageWriteClientTest {
131132
{165846896123456L /* 1975-04-04T12:34:56.123456Z */, 165846896123456789L}
132133
};
133134

134-
// Arrow's higher precision column is padded with extra 0's.
135-
public static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS =
135+
// Arrow's higher precision column is padded with extra 0's if configured to return
136+
// ISO as output for any picosecond enabled column.
137+
public static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT =
136138
new Object[][] {
137139
{1735734896123456L /* 2025-01-01T12:34:56.123456Z */, "2025-01-01T12:34:56.123456789000Z"},
138140
{1580646896123456L /* 2020-02-02T12:34:56.123456Z */, "2020-02-02T12:34:56.123456789000Z"},
@@ -2296,25 +2298,18 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
22962298
}
22972299
}
22982300

2301+
// Tests that inputs for micro and picos are able to use Arrow to write
2302+
// to BQ
22992303
@Test
23002304
public void timestamp_arrowWrite() throws IOException {
2301-
com.google.cloud.bigquery.Schema bigqueryTableSchema =
2302-
com.google.cloud.bigquery.Schema.of(
2303-
Field.newBuilder(TIMESTAMP_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2304-
.setMode(Mode.NULLABLE)
2305-
.build(),
2306-
Field.newBuilder(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2307-
.setMode(Mode.NULLABLE)
2308-
.setTimestampPrecision(12L)
2309-
.build());
2310-
23112305
String tableName = "bqstorage_timestamp_write_arrow";
2312-
TableId testTableId = TableId.of(DATASET, tableName);
2313-
bigquery.create(
2314-
TableInfo.of(
2315-
testTableId,
2316-
StandardTableDefinition.newBuilder().setSchema(bigqueryTableSchema).build()));
2306+
// Opt to create a new table to write to instead of re-using table to prevent
2307+
// the test from failing due to any issues with deleting data after test.
2308+
// Increases the test time duration, but would be more resilient to transient
2309+
// failures
2310+
createTimestampTable(tableName);
23172311

2312+
// Define the fields as Arrow types that are compatible with BQ Schema types
23182313
List<org.apache.arrow.vector.types.pojo.Field> fields =
23192314
ImmutableList.of(
23202315
new org.apache.arrow.vector.types.pojo.Field(
@@ -2332,7 +2327,7 @@ public void timestamp_arrowWrite() throws IOException {
23322327
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
23332328
new org.apache.arrow.vector.types.pojo.Schema(fields, null);
23342329

2335-
int numRows = Helper.INPUT_TIMESTAMPS.length;
2330+
int numRows = INPUT_ARROW_WRITE_TIMESTAMPS.length;
23362331
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
23372332
try (StreamWriter streamWriter =
23382333
StreamWriter.newBuilder(parent.toString() + "/_default")
@@ -2365,44 +2360,23 @@ public void timestamp_arrowWrite() throws IOException {
23652360
future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor());
23662361
}
23672362
}
2368-
String table =
2369-
BigQueryResource.formatTableResource(
2370-
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
2371-
List<GenericData.Record> rows = Helper.readAllRows(readClient, parentProjectId, table, null);
2372-
List<Long> timestamps =
2373-
rows.stream().map(x -> (Long) x.get(TIMESTAMP_COLUMN_NAME)).collect(Collectors.toList());
2374-
List<String> timestampHigherPrecision =
2375-
rows.stream()
2376-
.map(x -> x.get(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME).toString())
2377-
.collect(Collectors.toList());
2378-
assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION.length);
2379-
assertEquals(
2380-
timestampHigherPrecision.size(), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION.length);
2381-
for (int i = 0; i < timestampHigherPrecision.size(); i++) {
2382-
assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION[i][0]);
2383-
assertEquals(timestampHigherPrecision.get(i), EXPECTED_ARROW_WRITE_TIMESTAMPS[i][1]);
2384-
}
2363+
assertTimestamps(tableName, EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT);
23852364
}
23862365

2366+
// Tests that inputs for micro and picos are able to converted to protobuf
2367+
// and written to BQ
23872368
@Test
23882369
public void timestamp_protobufWrite()
23892370
throws IOException, DescriptorValidationException, InterruptedException {
2390-
com.google.cloud.bigquery.Schema bqTableSchema =
2391-
com.google.cloud.bigquery.Schema.of(
2392-
Field.newBuilder(TIMESTAMP_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2393-
.setMode(Mode.NULLABLE)
2394-
.build(),
2395-
Field.newBuilder(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2396-
.setMode(Mode.NULLABLE)
2397-
.setTimestampPrecision(12L)
2398-
.build());
2399-
24002371
String tableName = "bqstorage_timestamp_write_protobuf";
2401-
TableId testTableId = TableId.of(DATASET, tableName);
2402-
bigquery.create(
2403-
TableInfo.of(
2404-
testTableId, StandardTableDefinition.newBuilder().setSchema(bqTableSchema).build()));
2405-
2372+
// Opt to create a new table to write to instead of re-using table to prevent
2373+
// the test from failing due to any issues with deleting data after test.
2374+
// Increases the test time duration, but would be more resilient to transient
2375+
// failures
2376+
createTimestampTable(tableName);
2377+
2378+
// Define the table schema so that the automatic converter is able to
2379+
// determine how to convert from Json -> Protobuf
24062380
TableFieldSchema testTimestamp =
24072381
TableFieldSchema.newBuilder()
24082382
.setName(TIMESTAMP_COLUMN_NAME)
@@ -2436,24 +2410,48 @@ public void timestamp_protobufWrite()
24362410
ApiFutures.addCallback(
24372411
future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor());
24382412
}
2413+
assertTimestamps(tableName, EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT);
2414+
}
24392415

2416+
private void createTimestampTable(String tableName) {
2417+
Schema bqTableSchema =
2418+
Schema.of(
2419+
Field.newBuilder(TIMESTAMP_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2420+
.setMode(Mode.NULLABLE)
2421+
.build(),
2422+
Field.newBuilder(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME, StandardSQLTypeName.TIMESTAMP)
2423+
.setMode(Mode.NULLABLE)
2424+
.setTimestampPrecision(12L)
2425+
.build());
2426+
2427+
TableId testTableId = TableId.of(DATASET, tableName);
2428+
bigquery.create(
2429+
TableInfo.of(
2430+
testTableId, StandardTableDefinition.newBuilder().setSchema(bqTableSchema).build()));
2431+
}
2432+
2433+
private void assertTimestamps(String tableName, Object[][] expected) throws IOException {
24402434
String table =
24412435
BigQueryResource.formatTableResource(
24422436
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
2437+
2438+
// Read all the data as Avro GenericRecords
24432439
List<GenericData.Record> rows = Helper.readAllRows(readClient, parentProjectId, table, null);
2440+
2441+
// Each timestamp response is expected to contain two fields:
2442+
// 1. Micros from timestamp as a Long and 2. ISO8601 instant with picos precision
24442443
List<Long> timestamps =
24452444
rows.stream().map(x -> (Long) x.get(TIMESTAMP_COLUMN_NAME)).collect(Collectors.toList());
24462445
List<String> timestampHigherPrecision =
24472446
rows.stream()
24482447
.map(x -> x.get(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME).toString())
24492448
.collect(Collectors.toList());
2450-
assertEquals(timestamps.size(), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION.length);
2451-
assertEquals(
2452-
timestampHigherPrecision.size(), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION.length);
2453-
for (int i = 0; i < timestamps.size(); i++) {
2454-
assertEquals(timestamps.get(i), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION[i][0]);
2455-
assertEquals(
2456-
timestampHigherPrecision.get(i), Helper.EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION[i][1]);
2449+
2450+
assertEquals(timestamps.size(), expected.length);
2451+
assertEquals(timestampHigherPrecision.size(), expected.length);
2452+
for (int i = 0; i < timestampHigherPrecision.size(); i++) {
2453+
assertEquals(timestamps.get(i), expected[i][0]);
2454+
assertEquals(timestampHigherPrecision.get(i), expected[i][1]);
24572455
}
24582456
}
24592457
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/Helper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public class Helper {
5656
{165846896123456L /* 1975-04-04T12:34:56.123456Z */, "1975-04-04T12:34:56.123456789123Z"}
5757
};
5858

59-
// Expected response for timestamps from the input. The output is configured to return ISO8601
60-
// format for any picosecond enabled column.
61-
public static final Object[][] EXPECTED_TIMESTAMPS_ISO_HIGHER_PRECISION =
59+
// Expected response for timestamps from the input. If enabled with ISO as output, it will
60+
// ISO8601 format for any picosecond enabled column.
61+
public static final Object[][] EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT =
6262
new Object[][] {
6363
{1735734896123456L /* 2025-01-01T12:34:56.123456Z */, "2025-01-01T12:34:56.123456789123Z"},
6464
{1580646896123456L /* 2020-02-02T12:34:56.123456Z */, "2020-02-02T12:34:56.123456789123Z"},

0 commit comments

Comments
 (0)