Skip to content

Commit 25eb1f9

Browse files
authored
test: Add timestamp ITs for Read and Write API (#3156)
* test: Add ITs for timestamps for Read API * chore: Add timestamp test for avro * chore: Refactor packages into util package * chore: Rename to ITBigQueryStorageWriteClientTest * chore: Add header for Helper util class * chore: Address PR comments * chore: Add protobuf-java-util testing scope * chore: Use Timestamps.fromMillis
1 parent 3a82884 commit 25eb1f9

12 files changed

+606
-259
lines changed

google-cloud-bigquerystorage/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@
232232
<artifactId>arrow-memory-core</artifactId>
233233
<scope>test</scope>
234234
</dependency>
235+
<dependency>
236+
<groupId>com.google.protobuf</groupId>
237+
<artifactId>protobuf-java-util</artifactId>
238+
<scope>test</scope>
239+
</dependency>
235240

236241
<dependency>
237242
<groupId>io.grpc</groupId>

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageLongRunningTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
2727
import com.google.cloud.bigquery.storage.v1.ReadSession;
2828
import com.google.cloud.bigquery.storage.v1.ReadStream;
29+
import com.google.cloud.bigquery.storage.v1.it.util.BigQueryResource;
2930
import java.io.IOException;
3031
import java.util.ArrayList;
3132
import java.util.List;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageReadClientTest.java

Lines changed: 225 additions & 207 deletions
Large diffs are not rendered by default.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteClientTest.java renamed to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageWriteClientTest.java

Lines changed: 163 additions & 38 deletions
Large diffs are not rendered by default.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteNonQuotaRetryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.cloud.bigquery.storage.v1.StreamWriter;
3939
import com.google.cloud.bigquery.storage.v1.TableName;
4040
import com.google.cloud.bigquery.storage.v1.WriteStream;
41+
import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil;
4142
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
4243
import com.google.protobuf.DescriptorProtos.DescriptorProto;
4344
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteQuotaRetryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.bigquery.TableInfo;
2727
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
2828
import com.google.cloud.bigquery.storage.v1.WriteStream;
29+
import com.google.cloud.bigquery.storage.v1.it.util.WriteRetryTestUtil;
2930
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
3031
import com.google.protobuf.Descriptors.DescriptorValidationException;
3132
import java.io.IOException;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/BigQueryResource.java renamed to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/BigQueryResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.google.cloud.bigquery.storage.v1.it;
17+
package com.google.cloud.bigquery.storage.v1.it.util;
1818

1919
/** Test helper class to generate BigQuery resource paths. */
2020
public class BigQueryResource {
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery.storage.v1.it.util;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.fail;
21+
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.gax.rpc.ServerStream;
24+
import com.google.auth.oauth2.ServiceAccountCredentials;
25+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
26+
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
27+
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
28+
import com.google.cloud.bigquery.storage.v1.DataFormat;
29+
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
30+
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
31+
import com.google.cloud.bigquery.storage.v1.ReadSession;
32+
import com.google.common.base.Preconditions;
33+
import com.google.protobuf.util.Timestamps;
34+
import java.io.ByteArrayInputStream;
35+
import java.io.IOException;
36+
import java.io.InputStream;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import org.apache.avro.Schema;
40+
import org.apache.avro.generic.GenericData;
41+
import org.apache.avro.generic.GenericRecordBuilder;
42+
43+
public class Helper {
44+
45+
public static final Long[] INPUT_TIMESTAMPS_MICROS =
46+
new Long[] {
47+
1735734896123456L, // 2025-01-01T12:34:56.123456Z
48+
1580646896123456L, // 2020-02-02T12:34:56.123456Z
49+
636467696123456L, // 1990-03-03T12:34:56.123456Z
50+
165846896123456L // 1975-04-04T12:34:56.123456Z
51+
};
52+
53+
public static final Long[] EXPECTED_TIMESTAMPS_MICROS =
54+
new Long[] {
55+
1735734896123456L, // 2025-01-01T12:34:56.123456Z
56+
1580646896123456L, // 2020-02-02T12:34:56.123456Z
57+
636467696123456L, // 1990-03-03T12:34:56.123456Z
58+
165846896123456L // 1975-04-04T12:34:56.123456Z
59+
};
60+
61+
public static ServiceAccountCredentials loadCredentials(String credentialFile) {
62+
try (InputStream keyStream = new ByteArrayInputStream(credentialFile.getBytes())) {
63+
return ServiceAccountCredentials.fromStream(keyStream);
64+
} catch (IOException e) {
65+
fail("Couldn't create fake JSON credentials.");
66+
}
67+
return null;
68+
}
69+
70+
public static class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
71+
private final Object lock = new Object();
72+
private int batchCount = 0;
73+
74+
public void onSuccess(AppendRowsResponse response) {
75+
synchronized (lock) {
76+
if (response.hasError()) {
77+
System.out.format("Error: %s\n", response.getError());
78+
} else {
79+
++batchCount;
80+
System.out.format("Wrote batch %d\n", batchCount);
81+
}
82+
}
83+
}
84+
85+
public void onFailure(Throwable throwable) {
86+
System.out.format("Error: %s\n", throwable.toString());
87+
}
88+
}
89+
90+
/**
91+
* Reads all the rows from the specified table.
92+
*
93+
* <p>For every row, the consumer is called for processing.
94+
*
95+
* @param table
96+
* @param snapshotInMillis Optional. If specified, all rows up to timestamp will be returned.
97+
* @param filter Optional. If specified, it will be used to restrict returned data.
98+
* @param consumer that receives all Avro rows.
99+
* @throws IOException
100+
*/
101+
public static void processRowsAtSnapshot(
102+
BigQueryReadClient client,
103+
String parentProjectId,
104+
String table,
105+
Long snapshotInMillis,
106+
String filter,
107+
SimpleRowReaderAvro.AvroRowConsumer consumer)
108+
throws IOException {
109+
Preconditions.checkNotNull(table);
110+
Preconditions.checkNotNull(consumer);
111+
112+
CreateReadSessionRequest.Builder createSessionRequestBuilder =
113+
CreateReadSessionRequest.newBuilder()
114+
.setParent(parentProjectId)
115+
.setMaxStreamCount(1)
116+
.setReadSession(
117+
ReadSession.newBuilder().setTable(table).setDataFormat(DataFormat.AVRO).build());
118+
119+
if (snapshotInMillis != null) {
120+
createSessionRequestBuilder
121+
.getReadSessionBuilder()
122+
.setTableModifiers(
123+
ReadSession.TableModifiers.newBuilder()
124+
.setSnapshotTime(Timestamps.fromMillis(snapshotInMillis))
125+
.build());
126+
}
127+
128+
if (filter != null && !filter.isEmpty()) {
129+
createSessionRequestBuilder
130+
.getReadSessionBuilder()
131+
.setReadOptions(
132+
ReadSession.TableReadOptions.newBuilder().setRowRestriction(filter).build());
133+
}
134+
135+
ReadSession session = client.createReadSession(createSessionRequestBuilder.build());
136+
assertEquals(
137+
String.format(
138+
"Did not receive expected number of streams for table '%s' CreateReadSession"
139+
+ " response:%n%s",
140+
table, session.toString()),
141+
1,
142+
session.getStreamsCount());
143+
144+
ReadRowsRequest readRowsRequest =
145+
ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();
146+
147+
SimpleRowReaderAvro reader =
148+
new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
149+
150+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
151+
for (ReadRowsResponse response : stream) {
152+
reader.processRows(response.getAvroRows(), consumer);
153+
}
154+
}
155+
156+
/**
157+
* Reads all the rows from the specified table and returns a list as generic Avro records.
158+
*
159+
* @param table
160+
* @param filter Optional. If specified, it will be used to restrict returned data.
161+
* @return
162+
*/
163+
public static List<GenericData.Record> readAllRows(
164+
BigQueryReadClient client, String parentProjectId, String table, String filter)
165+
throws IOException {
166+
final List<GenericData.Record> rows = new ArrayList<>();
167+
processRowsAtSnapshot(
168+
client,
169+
parentProjectId,
170+
/* table= */ table,
171+
/* snapshotInMillis= */ null,
172+
/* filter= */ filter,
173+
(SimpleRowReaderAvro.AvroRowConsumer)
174+
record -> {
175+
// clone the record since that reference will be reused by the reader.
176+
rows.add(new GenericRecordBuilder(record).build());
177+
});
178+
return rows;
179+
}
180+
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java renamed to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderArrow.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.google.cloud.bigquery.storage.v1.it;
17+
package com.google.cloud.bigquery.storage.v1.it.util;
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020

@@ -23,7 +23,6 @@
2323
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
2424
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
2525
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.ImmutableMap;
2726
import java.io.IOException;
2827
import java.time.LocalDateTime;
2928
import java.util.ArrayList;
@@ -50,17 +49,34 @@ public interface ArrowBatchConsumer {
5049
void accept(VectorSchemaRoot root);
5150
}
5251

52+
public static class ArrowTimestampBatchConsumer implements ArrowBatchConsumer {
53+
private final List<Long> expectedTimestampValues;
54+
55+
public ArrowTimestampBatchConsumer(List<Long> expectedTimestampValues) {
56+
this.expectedTimestampValues = expectedTimestampValues;
57+
}
58+
59+
@Override
60+
public void accept(VectorSchemaRoot root) {
61+
FieldVector timestampFieldVector = root.getVector("timestamp");
62+
int count = timestampFieldVector.getValueCount();
63+
for (int i = 0; i < count; i++) {
64+
long value = (Long) timestampFieldVector.getObject(i);
65+
assertThat(value).isEqualTo(expectedTimestampValues.get(i));
66+
}
67+
}
68+
}
69+
5370
/** ArrowRangeBatchConsumer accepts batch Arrow data and validate the range values. */
5471
public static class ArrowRangeBatchConsumer implements ArrowBatchConsumer {
55-
56-
private final ImmutableMap<String, Range> expectedRangeDateValues;
57-
private final ImmutableMap<String, Range> expectedRangeDatetimeValues;
58-
private final ImmutableMap<String, Range> expectedRangeTimestampValues;
72+
private final Map<String, Range> expectedRangeDateValues;
73+
private final Map<String, Range> expectedRangeDatetimeValues;
74+
private final Map<String, Range> expectedRangeTimestampValues;
5975

6076
public ArrowRangeBatchConsumer(
61-
ImmutableMap<String, Range> expectedRangeDateValues,
62-
ImmutableMap<String, Range> expectedRangeDatetimeValues,
63-
ImmutableMap<String, Range> expectedRangeTimestampValues) {
77+
Map<String, Range> expectedRangeDateValues,
78+
Map<String, Range> expectedRangeDatetimeValues,
79+
Map<String, Range> expectedRangeTimestampValues) {
6480
this.expectedRangeDateValues = expectedRangeDateValues;
6581
this.expectedRangeDatetimeValues = expectedRangeDatetimeValues;
6682
this.expectedRangeTimestampValues = expectedRangeTimestampValues;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java renamed to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/util/SimpleRowReaderAvro.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.google.cloud.bigquery.storage.v1.it;
17+
package com.google.cloud.bigquery.storage.v1.it.util;
1818

1919
import com.google.cloud.bigquery.storage.v1.AvroRows;
2020
import com.google.common.base.Preconditions;

0 commit comments

Comments
 (0)