Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ public static void exportToOpenTelemetry(String projectId, String datasetName, S

// Final cleanup for the stream during worker teardown.
writer.cleanup();
verifyExpectedRowCount(parentTable, 12);
verifyExpectedRowCount(parentTable, 12L);
System.out.println("Appended records successfully.");
}

private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount)
throws InterruptedException {
String queryRowCount =
"SELECT COUNT(*) FROM `"
Expand All @@ -122,8 +122,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build();
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableResult results = bigquery.query(queryConfig);
int countRowsActual =
Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue());
long countRowsActual =
Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue());
if (countRowsActual != expectedRowCount) {
throw new RuntimeException(
"Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual);
Expand Down Expand Up @@ -175,7 +175,7 @@ private JsonStreamWriter createStreamWriter(String tableName)
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(tableName, client)
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10)))
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
.setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

// [START bigquerystorage_read_timestamp_arrow]
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;

/**
* Depending on the JDK version, you may need to include this into your VM options: {@code
* --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}
*
* <p>See the <a
* href="https://arrow.apache.org/docs/java/install.html#java-compatibility">documentation</a> for
* more information.
*/
public class ReadTimestampArrow {
/*
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted
* from the storage API using a generic datum decoder.
*/
private static class SimpleRowReader implements AutoCloseable {

BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private final VectorSchemaRoot root;
private final VectorLoader loader;

public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
Schema schema =
MessageSerializer.deserializeSchema(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
arrowSchema.getSerializedSchema().toByteArray())));
Preconditions.checkNotNull(schema);
List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields()) {
vectors.add(field.createVector(allocator));
}
root = new VectorSchemaRoot(vectors);
loader = new VectorLoader(root);
}

/**
* Sample method for processing Arrow data which only validates decoding.
*
* @param batch object returned from the ReadRowsResponse.
*/
public void processRows(ArrowRecordBatch batch) throws IOException {
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
MessageSerializer.deserializeRecordBatch(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
batch.getSerializedRecordBatch().toByteArray())),
allocator);

loader.load(deserializedBatch);
// Release buffers from batch (they are still held in the vectors in root).
deserializedBatch.close();
System.out.println(root.contentToTSVString());
// Release buffers from vectors in root.
root.clear();
}

@Override
public void close() {
root.close();
allocator.close();
}
}

public static void main(String... args) throws Exception {
// Sets your Google Cloud Platform project ID.
String projectId = args[0];
Long snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Long.parseLong(args[1]);
}

try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);

// This example uses citibike data from the public datasets.
String srcTable =
String.format(
"projects/%s/datasets/%s/tables/%s",
"bigquery-public-data", "new_york_citibike", "citibike_stations");

// We specify the columns to be projected by adding them to the selected fields,
ReadSession.TableReadOptions options =
ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build();

// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
// This API can also deliver data serialized in Apache Avro format.
// This example leverages Apache Arrow.
.setDataFormat(DataFormat.ARROW)
.setReadOptions(options);

// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
if (snapshotMillis != null) {
Timestamp t =
Timestamp.newBuilder()
.setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
.build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}

// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);

ReadSession session = client.createReadSession(builder.build());
// Setup a simple reader and start a read session.
try (ReadTimestampArrow.SimpleRowReader reader =
new ReadTimestampArrow.SimpleRowReader(session.getArrowSchema())) {

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
reader.processRows(response.getArrowRecordBatch());
}
}
}
}
}
// [END bigquerystorage_read_timestamp_arrow]
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.bigquerystorage;

// [START bigquerystorage_read_timestamp_avro]
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

public class ReadTimestampAvro {
/*
* SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted
* from the storage API using a generic datum decoder.
*/
private static class SimpleRowReader {

private final DatumReader<GenericRecord> datumReader;

// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private BinaryDecoder decoder = null;

// GenericRecord object will be reused.
private GenericRecord row = null;

public SimpleRowReader(Schema schema) {
Preconditions.checkNotNull(schema);
datumReader = new GenericDatumReader<>(schema);
}

/**
* Sample method for processing AVRO rows which only validates decoding.
*
* @param avroRows object returned from the ReadRowsResponse.
*/
public void processRows(AvroRows avroRows) throws IOException {
decoder =
DecoderFactory.get()
.binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder);

while (!decoder.isEnd()) {
// Reusing object row
row = datumReader.read(row, decoder);
System.out.println(row.toString());
}
}
}

public static void main(String... args) throws Exception {
// Sets your Google Cloud Platform project ID.
String projectId = args[0];
Long snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Long.parseLong(args[1]);
}

try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);

// This example uses citibike data from the public datasets.
String srcTable =
String.format(
"projects/%s/datasets/%s/tables/%s",
"bigquery-public-data", "new_york_citibike", "citibike_stations");

// We specify the columns to be projected by adding them to the selected fields,
ReadSession.TableReadOptions options =
ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build();

// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
// This API can also deliver data serialized in Apache Avro format.
// This example leverages Apache Avro.
.setDataFormat(DataFormat.AVRO)
.setReadOptions(options);

// Optionally specify the snapshot time. When unspecified, snapshot time is "now".
if (snapshotMillis != null) {
Timestamp t =
Timestamp.newBuilder()
.setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000))
.build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}

// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);

// Request the session creation.
ReadSession session = client.createReadSession(builder.build());

SimpleRowReader reader =
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema()));

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasAvroRows());
reader.processRows(response.getAvroRows());
}
}
}
}
// [END bigquerystorage_read_timestamp_avro]
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ public static void runWriteNestedProto(String projectId, String datasetName, Str
}
}
}
// [END bigquerystorage_writenestedproto]
// [END bigquerystorage_writenestedproto]
Loading
Loading