Skip to content

Commit 332736e

Browse files
authored
docs: Add samples for using timestamps with BQStorage Read and Write API (#3167)
* chore: Add samples showcases reading a timestamp value from public dataset * docs: Add samples for using timestamps with BQStorage Read and Write API * chore: Add missing header * chore: Remove fmt plugin in samples * chore: Fix samples lint issues * chore: Add ITs for the added samples * chore: Fix commented out nested samples * chore: Address code comments * chore: Address code comments * chore: Use long for expectedRowCount * chore: Fix checkstyle issue
1 parent 4f35f1a commit 332736e

13 files changed

+1343
-27
lines changed

samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,11 @@ public static void exportToOpenTelemetry(String projectId, String datasetName, S
105105

106106
// Final cleanup for the stream during worker teardown.
107107
writer.cleanup();
108-
verifyExpectedRowCount(parentTable, 12);
108+
verifyExpectedRowCount(parentTable, 12L);
109109
System.out.println("Appended records successfully.");
110110
}
111111

112-
private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
112+
private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount)
113113
throws InterruptedException {
114114
String queryRowCount =
115115
"SELECT COUNT(*) FROM `"
@@ -122,8 +122,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo
122122
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build();
123123
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
124124
TableResult results = bigquery.query(queryConfig);
125-
int countRowsActual =
126-
Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());
125+
long countRowsActual =
126+
Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue());
127127
if (countRowsActual != expectedRowCount) {
128128
throw new RuntimeException(
129129
"Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual);
@@ -175,7 +175,7 @@ private JsonStreamWriter createStreamWriter(String tableName)
175175
// For more information about JsonStreamWriter, see:
176176
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
177177
return JsonStreamWriter.newBuilder(tableName, client)
178-
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
178+
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10)))
179179
.setChannelProvider(
180180
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
181181
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerystorage;
18+
19+
// [START bigquerystorage_read_timestamp_arrow]
20+
import com.google.api.gax.rpc.ServerStream;
21+
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
22+
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
23+
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
24+
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
25+
import com.google.cloud.bigquery.storage.v1.DataFormat;
26+
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
27+
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
28+
import com.google.cloud.bigquery.storage.v1.ReadSession;
29+
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
30+
import com.google.common.base.Preconditions;
31+
import com.google.protobuf.Timestamp;
32+
import java.io.IOException;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import org.apache.arrow.memory.BufferAllocator;
36+
import org.apache.arrow.memory.RootAllocator;
37+
import org.apache.arrow.vector.FieldVector;
38+
import org.apache.arrow.vector.VectorLoader;
39+
import org.apache.arrow.vector.VectorSchemaRoot;
40+
import org.apache.arrow.vector.ipc.ReadChannel;
41+
import org.apache.arrow.vector.ipc.message.MessageSerializer;
42+
import org.apache.arrow.vector.types.pojo.Field;
43+
import org.apache.arrow.vector.types.pojo.Schema;
44+
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
45+
46+
/**
47+
* Depending on the JDK version, you may need to include this into your VM options: {@code
48+
* --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}
49+
*
50+
* <p>See the <a
51+
* href="https://arrow.apache.org/docs/java/install.html#java-compatibility">documentation</a> for
52+
* more information.
53+
*/
54+
public class ReadTimestampArrow {
55+
/*
56+
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
57+
* from the storage API using a generic datum decoder.
58+
*/
59+
private static class SimpleRowReader implements AutoCloseable {
60+
61+
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
62+
63+
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
64+
private final VectorSchemaRoot root;
65+
private final VectorLoader loader;
66+
67+
public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
68+
Schema schema =
69+
MessageSerializer.deserializeSchema(
70+
new ReadChannel(
71+
new ByteArrayReadableSeekableByteChannel(
72+
arrowSchema.getSerializedSchema().toByteArray())));
73+
Preconditions.checkNotNull(schema);
74+
List<FieldVector> vectors = new ArrayList<>();
75+
for (Field field : schema.getFields()) {
76+
vectors.add(field.createVector(allocator));
77+
}
78+
root = new VectorSchemaRoot(vectors);
79+
loader = new VectorLoader(root);
80+
}
81+
82+
/**
83+
* Sample method for processing Arrow data which only validates decoding.
84+
*
85+
* @param batch object returned from the ReadRowsResponse.
86+
*/
87+
public void processRows(ArrowRecordBatch batch) throws IOException {
88+
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
89+
MessageSerializer.deserializeRecordBatch(
90+
new ReadChannel(
91+
new ByteArrayReadableSeekableByteChannel(
92+
batch.getSerializedRecordBatch().toByteArray())),
93+
allocator);
94+
95+
loader.load(deserializedBatch);
96+
// Release buffers from batch (they are still held in the vectors in root).
97+
deserializedBatch.close();
98+
System.out.println(root.contentToTSVString());
99+
// Release buffers from vectors in root.
100+
root.clear();
101+
}
102+
103+
@Override
104+
public void close() {
105+
root.close();
106+
allocator.close();
107+
}
108+
}
109+
110+
public static void main(String... args) throws Exception {
111+
// Sets your Google Cloud Platform project ID.
112+
String projectId = args[0];
113+
Long snapshotMillis = null;
114+
if (args.length > 1) {
115+
snapshotMillis = Long.parseLong(args[1]);
116+
}
117+
118+
try (BigQueryReadClient client = BigQueryReadClient.create()) {
119+
String parent = String.format("projects/%s", projectId);
120+
121+
// This example uses citibike data from the public datasets.
122+
String srcTable =
123+
String.format(
124+
"projects/%s/datasets/%s/tables/%s",
125+
"bigquery-public-data", "new_york_citibike", "citibike_stations");
126+
127+
// We specify the columns to be projected by adding them to the selected fields,
128+
ReadSession.TableReadOptions options =
129+
ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build();
130+
131+
// Start specifying the read session we want created.
132+
ReadSession.Builder sessionBuilder =
133+
ReadSession.newBuilder()
134+
.setTable(srcTable)
135+
// This API can also deliver data serialized in Apache Avro format.
136+
// This example leverages Apache Arrow.
137+
.setDataFormat(DataFormat.ARROW)
138+
.setReadOptions(options);
139+
140+
// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
141+
if (snapshotMillis != null) {
142+
Timestamp t =
143+
Timestamp.newBuilder()
144+
.setSeconds(snapshotMillis / 1000)
145+
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
146+
.build();
147+
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
148+
sessionBuilder.setTableModifiers(modifiers);
149+
}
150+
151+
// Begin building the session creation request.
152+
CreateReadSessionRequest.Builder builder =
153+
CreateReadSessionRequest.newBuilder()
154+
.setParent(parent)
155+
.setReadSession(sessionBuilder)
156+
.setMaxStreamCount(1);
157+
158+
ReadSession session = client.createReadSession(builder.build());
159+
// Setup a simple reader and start a read session.
160+
try (ReadTimestampArrow.SimpleRowReader reader =
161+
new ReadTimestampArrow.SimpleRowReader(session.getArrowSchema())) {
162+
163+
// Assert that there are streams available in the session. An empty table may not have
164+
// data available. If no sessions are available for an anonymous (cached) table, consider
165+
// writing results of a query to a named table rather than consuming cached results
166+
// directly.
167+
Preconditions.checkState(session.getStreamsCount() > 0);
168+
169+
// Use the first stream to perform reading.
170+
String streamName = session.getStreams(0).getName();
171+
172+
ReadRowsRequest readRowsRequest =
173+
ReadRowsRequest.newBuilder().setReadStream(streamName).build();
174+
175+
// Process each block of rows as they arrive and decode using our simple row reader.
176+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
177+
for (ReadRowsResponse response : stream) {
178+
Preconditions.checkState(response.hasArrowRecordBatch());
179+
reader.processRows(response.getArrowRecordBatch());
180+
}
181+
}
182+
}
183+
}
184+
}
185+
// [END bigquerystorage_read_timestamp_arrow]
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerystorage;
18+
19+
// [START bigquerystorage_read_timestamp_avro]
20+
import com.google.api.gax.rpc.ServerStream;
21+
import com.google.cloud.bigquery.storage.v1.AvroRows;
22+
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
23+
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
24+
import com.google.cloud.bigquery.storage.v1.DataFormat;
25+
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
26+
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
27+
import com.google.cloud.bigquery.storage.v1.ReadSession;
28+
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
29+
import com.google.common.base.Preconditions;
30+
import com.google.protobuf.Timestamp;
31+
import java.io.IOException;
32+
import org.apache.avro.Schema;
33+
import org.apache.avro.generic.GenericDatumReader;
34+
import org.apache.avro.generic.GenericRecord;
35+
import org.apache.avro.io.BinaryDecoder;
36+
import org.apache.avro.io.DatumReader;
37+
import org.apache.avro.io.DecoderFactory;
38+
39+
public class ReadTimestampAvro {
40+
/*
41+
* SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted
42+
* from the storage API using a generic datum decoder.
43+
*/
44+
private static class SimpleRowReader {
45+
46+
private final DatumReader<GenericRecord> datumReader;
47+
48+
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
49+
private BinaryDecoder decoder = null;
50+
51+
// GenericRecord object will be reused.
52+
private GenericRecord row = null;
53+
54+
public SimpleRowReader(Schema schema) {
55+
Preconditions.checkNotNull(schema);
56+
datumReader = new GenericDatumReader<>(schema);
57+
}
58+
59+
/**
60+
* Sample method for processing AVRO rows which only validates decoding.
61+
*
62+
* @param avroRows object returned from the ReadRowsResponse.
63+
*/
64+
public void processRows(AvroRows avroRows) throws IOException {
65+
decoder =
66+
DecoderFactory.get()
67+
.binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder);
68+
69+
while (!decoder.isEnd()) {
70+
// Reusing object row
71+
row = datumReader.read(row, decoder);
72+
System.out.println(row.toString());
73+
}
74+
}
75+
}
76+
77+
public static void main(String... args) throws Exception {
78+
// Sets your Google Cloud Platform project ID.
79+
String projectId = args[0];
80+
Long snapshotMillis = null;
81+
if (args.length > 1) {
82+
snapshotMillis = Long.parseLong(args[1]);
83+
}
84+
85+
try (BigQueryReadClient client = BigQueryReadClient.create()) {
86+
String parent = String.format("projects/%s", projectId);
87+
88+
// This example uses citibike data from the public datasets.
89+
String srcTable =
90+
String.format(
91+
"projects/%s/datasets/%s/tables/%s",
92+
"bigquery-public-data", "new_york_citibike", "citibike_stations");
93+
94+
// We specify the columns to be projected by adding them to the selected fields,
95+
ReadSession.TableReadOptions options =
96+
ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build();
97+
98+
// Start specifying the read session we want created.
99+
ReadSession.Builder sessionBuilder =
100+
ReadSession.newBuilder()
101+
.setTable(srcTable)
102+
// This API can also deliver data serialized in Apache Avro format.
103+
// This example leverages Apache Avro.
104+
.setDataFormat(DataFormat.AVRO)
105+
.setReadOptions(options);
106+
107+
// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
108+
if (snapshotMillis != null) {
109+
Timestamp t =
110+
Timestamp.newBuilder()
111+
.setSeconds(snapshotMillis / 1000)
112+
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
113+
.build();
114+
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
115+
sessionBuilder.setTableModifiers(modifiers);
116+
}
117+
118+
// Begin building the session creation request.
119+
CreateReadSessionRequest.Builder builder =
120+
CreateReadSessionRequest.newBuilder()
121+
.setParent(parent)
122+
.setReadSession(sessionBuilder)
123+
.setMaxStreamCount(1);
124+
125+
// Request the session creation.
126+
ReadSession session = client.createReadSession(builder.build());
127+
128+
SimpleRowReader reader =
129+
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema()));
130+
131+
// Assert that there are streams available in the session. An empty table may not have
132+
// data available. If no sessions are available for an anonymous (cached) table, consider
133+
// writing results of a query to a named table rather than consuming cached results directly.
134+
Preconditions.checkState(session.getStreamsCount() > 0);
135+
136+
// Use the first stream to perform reading.
137+
String streamName = session.getStreams(0).getName();
138+
139+
ReadRowsRequest readRowsRequest =
140+
ReadRowsRequest.newBuilder().setReadStream(streamName).build();
141+
142+
// Process each block of rows as they arrive and decode using our simple row reader.
143+
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
144+
for (ReadRowsResponse response : stream) {
145+
Preconditions.checkState(response.hasAvroRows());
146+
reader.processRows(response.getAvroRows());
147+
}
148+
}
149+
}
150+
}
151+
// [END bigquerystorage_read_timestamp_avro]

samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,4 @@ public static void runWriteNestedProto(String projectId, String datasetName, Str
7070
}
7171
}
7272
}
73-
// [END bigquerystorage_writenestedproto]
73+
// [END bigquerystorage_writenestedproto]

0 commit comments

Comments
 (0)