diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java index 08604d4d9d..ea06c76e17 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ExportOpenTelemetry.java @@ -105,11 +105,11 @@ public static void exportToOpenTelemetry(String projectId, String datasetName, S // Final cleanup for the stream during worker teardown. writer.cleanup(); - verifyExpectedRowCount(parentTable, 12); + verifyExpectedRowCount(parentTable, 12L); System.out.println("Appended records successfully."); } - private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) throws InterruptedException { String queryRowCount = "SELECT COUNT(*) FROM `" @@ -122,8 +122,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); @@ -175,7 +175,7 @@ private JsonStreamWriter createStreamWriter(String tableName) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html return JsonStreamWriter.newBuilder(tableName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java new file mode 100644 index 0000000000..829bbb31e9 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampArrow.java @@ -0,0 +1,185 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_read_timestamp_arrow] +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; +import com.google.cloud.bigquery.storage.v1.ArrowSchema; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +/** + * Depending on the JDK version, you may need to include this into your VM options: {@code + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED} + * + *

See the documentation for + * more information. + */ +public class ReadTimestampArrow { + /* + * SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted + * from the storage API using a generic datum decoder. + */ + private static class SimpleRowReader implements AutoCloseable { + + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private final VectorSchemaRoot root; + private final VectorLoader loader; + + public SimpleRowReader(ArrowSchema arrowSchema) throws IOException { + Schema schema = + MessageSerializer.deserializeSchema( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + arrowSchema.getSerializedSchema().toByteArray()))); + Preconditions.checkNotNull(schema); + List vectors = new ArrayList<>(); + for (Field field : schema.getFields()) { + vectors.add(field.createVector(allocator)); + } + root = new VectorSchemaRoot(vectors); + loader = new VectorLoader(root); + } + + /** + * Sample method for processing Arrow data which only validates decoding. + * + * @param batch object returned from the ReadRowsResponse. + */ + public void processRows(ArrowRecordBatch batch) throws IOException { + org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = + MessageSerializer.deserializeRecordBatch( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + batch.getSerializedRecordBatch().toByteArray())), + allocator); + + loader.load(deserializedBatch); + // Release buffers from batch (they are still held in the vectors in root). + deserializedBatch.close(); + System.out.println(root.contentToTSVString()); + // Release buffers from vectors in root. + root.clear(); + } + + @Override + public void close() { + root.close(); + allocator.close(); + } + } + + public static void main(String... args) throws Exception { + // Sets your Google Cloud Platform project ID. + String projectId = args[0]; + Long snapshotMillis = null; + if (args.length > 1) { + snapshotMillis = Long.parseLong(args[1]); + } + + try (BigQueryReadClient client = BigQueryReadClient.create()) { + String parent = String.format("projects/%s", projectId); + + // This example uses citibike data from the public datasets. + String srcTable = + String.format( + "projects/%s/datasets/%s/tables/%s", + "bigquery-public-data", "new_york_citibike", "citibike_stations"); + + // We specify the columns to be projected by adding them to the selected fields, + ReadSession.TableReadOptions options = + ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build(); + + // Start specifying the read session we want created. + ReadSession.Builder sessionBuilder = + ReadSession.newBuilder() + .setTable(srcTable) + // This API can also deliver data serialized in Apache Avro format. + // This example leverages Apache Arrow. + .setDataFormat(DataFormat.ARROW) + .setReadOptions(options); + + // Optionally specify the snapshot time. When unspecified, snapshot time is "now". + if (snapshotMillis != null) { + Timestamp t = + Timestamp.newBuilder() + .setSeconds(snapshotMillis / 1000) + .setNanos((int) ((snapshotMillis % 1000) * 1000000)) + .build(); + TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); + sessionBuilder.setTableModifiers(modifiers); + } + + // Begin building the session creation request. + CreateReadSessionRequest.Builder builder = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(sessionBuilder) + .setMaxStreamCount(1); + + ReadSession session = client.createReadSession(builder.build()); + // Setup a simple reader and start a read session. + try (ReadTimestampArrow.SimpleRowReader reader = + new ReadTimestampArrow.SimpleRowReader(session.getArrowSchema())) { + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + reader.processRows(response.getArrowRecordBatch()); + } + } + } + } +} +// [END bigquerystorage_read_timestamp_arrow] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java new file mode 100644 index 0000000000..6343c7739f --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ReadTimestampAvro.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_read_timestamp_avro] +import com.google.api.gax.rpc.ServerStream; +import com.google.cloud.bigquery.storage.v1.AvroRows; +import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; +import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; +import com.google.common.base.Preconditions; +import com.google.protobuf.Timestamp; +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +public class ReadTimestampAvro { + /* + * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted + * from the storage API using a generic datum decoder. + */ + private static class SimpleRowReader { + + private final DatumReader datumReader; + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private BinaryDecoder decoder = null; + + // GenericRecord object will be reused. + private GenericRecord row = null; + + public SimpleRowReader(Schema schema) { + Preconditions.checkNotNull(schema); + datumReader = new GenericDatumReader<>(schema); + } + + /** + * Sample method for processing AVRO rows which only validates decoding. + * + * @param avroRows object returned from the ReadRowsResponse. + */ + public void processRows(AvroRows avroRows) throws IOException { + decoder = + DecoderFactory.get() + .binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder); + + while (!decoder.isEnd()) { + // Reusing object row + row = datumReader.read(row, decoder); + System.out.println(row.toString()); + } + } + } + + public static void main(String... args) throws Exception { + // Sets your Google Cloud Platform project ID. + String projectId = args[0]; + Long snapshotMillis = null; + if (args.length > 1) { + snapshotMillis = Long.parseLong(args[1]); + } + + try (BigQueryReadClient client = BigQueryReadClient.create()) { + String parent = String.format("projects/%s", projectId); + + // This example uses citibike data from the public datasets. + String srcTable = + String.format( + "projects/%s/datasets/%s/tables/%s", + "bigquery-public-data", "new_york_citibike", "citibike_stations"); + + // We specify the columns to be projected by adding them to the selected fields, + ReadSession.TableReadOptions options = + ReadSession.TableReadOptions.newBuilder().addSelectedFields("last_reported").build(); + + // Start specifying the read session we want created. + ReadSession.Builder sessionBuilder = + ReadSession.newBuilder() + .setTable(srcTable) + // This API can also deliver data serialized in Apache Avro format. + // This example leverages Apache Avro. + .setDataFormat(DataFormat.AVRO) + .setReadOptions(options); + + // Optionally specify the snapshot time. When unspecified, snapshot time is "now". + if (snapshotMillis != null) { + Timestamp t = + Timestamp.newBuilder() + .setSeconds(snapshotMillis / 1000) + .setNanos((int) ((snapshotMillis % 1000) * 1000000)) + .build(); + TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); + sessionBuilder.setTableModifiers(modifiers); + } + + // Begin building the session creation request. + CreateReadSessionRequest.Builder builder = + CreateReadSessionRequest.newBuilder() + .setParent(parent) + .setReadSession(sessionBuilder) + .setMaxStreamCount(1); + + // Request the session creation. + ReadSession session = client.createReadSession(builder.build()); + + SimpleRowReader reader = + new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasAvroRows()); + reader.processRows(response.getAvroRows()); + } + } + } +} +// [END bigquerystorage_read_timestamp_avro] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java index 92ab52724c..55dc96a8b6 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteNestedProto.java @@ -70,4 +70,4 @@ public static void runWriteNestedProto(String projectId, String datasetName, Str } } } -// [END bigquerystorage_writenestedproto] +// [END bigquerystorage_writenestedproto] \ No newline at end of file diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 483238a816..fed1493587 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -106,11 +106,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // Final cleanup for the stream during worker teardown. writer.cleanup(); - verifyExpectedRowCount(parentTable, 12); + verifyExpectedRowCount(parentTable, 12L); System.out.println("Appended records successfully."); } - private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount) + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) throws InterruptedException { String queryRowCount = "SELECT COUNT(*) FROM `" @@ -123,8 +123,8 @@ private static void verifyExpectedRowCount(TableName parentTable, int expectedRo QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); @@ -176,7 +176,7 @@ private JsonStreamWriter createStreamWriter(String tableName) // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html return JsonStreamWriter.newBuilder(tableName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java new file mode 100644 index 0000000000..9bcb32d764 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJson.java @@ -0,0 +1,312 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_timestamp_jsonstreamwriter_default] +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.DescriptorValidationException; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import org.json.JSONArray; +import org.json.JSONObject; +import org.threeten.bp.Duration; + +public class WriteToDefaultStreamTimestampJson { + + public static void runWriteToDefaultStream() + throws Descriptors.DescriptorValidationException, InterruptedException, IOException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "MY_PROJECT_ID"; + String datasetName = "MY_DATASET_NAME"; + String tableName = "MY_TABLE_NAME"; + writeToDefaultStream(projectId, datasetName, tableName); + } + + // Create a JSON object that is compatible with the table schema. + private static JSONObject buildRecord() { + JSONObject record = new JSONObject(); + record.put("timestampField", Instant.now().toString()); + return record; + } + + public static void writeToDefaultStream(String projectId, String datasetName, String tableName) + throws Descriptors.DescriptorValidationException, InterruptedException, IOException { + TableName parentTable = TableName.of(projectId, datasetName, tableName); + + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable); + + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + for (int i = 0; i < 2; i++) { + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + jsonArr.put(buildRecord()); + } + + writer.append(new AppendContext(jsonArr)); + } + + // Final cleanup for the stream during worker teardown. + writer.cleanup(); + verifyExpectedRowCount(parentTable, 20L); + System.out.println("Appended records successfully."); + } + + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + TableResult results = bigquery.query(queryConfig); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + + private static class AppendContext { + JSONArray data; + + AppendContext(JSONArray data) { + this.data = data; + } + } + + private static class DataWriter { + + private static final int MAX_RECREATE_COUNT = 3; + + private BigQueryWriteClient client; + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + private JsonStreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + private final AtomicInteger recreateCount = new AtomicInteger(0); + + private JsonStreamWriter createStreamWriter(String tableName) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + // Use the JSON stream writer to send records in JSON format. Specify the table name to write + // to the default stream. + // For more information about JsonStreamWriter, see: + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html + return JsonStreamWriter.newBuilder(tableName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .build()) + .setEnableConnectionPool(true) + // This will allow connection pool to scale up better. + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(100L).build()) + // If value is missing in json and there is a default value configured on bigquery + // column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString()); + } + + public void append(AppendContext appendContext) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = createStreamWriter(streamWriter.getStreamName()); + this.error = null; + } + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(appendContext.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + client.close(); + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final AppendContext appendContext; + + public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) { + this.parent = parent; + this.appendContext = appendContext; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.println("Append success"); + this.parent.recreateCount.set(0); + done(); + } + + public void onFailure(Throwable throwable) { + if (throwable instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError ase = (Exceptions.AppendSerializationError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (!rowIndexToErrorMessage.isEmpty()) { + // Omit the faulty rows + JSONArray dataNew = new JSONArray(); + for (int i = 0; i < appendContext.data.length(); i++) { + if (!rowIndexToErrorMessage.containsKey(i)) { + dataNew.put(appendContext.data.get(i)); + } else { + // process faulty rows by placing them on a dead-letter-queue, for instance + } + } + + // Retry the remaining valid rows, but using a separate thread to + // avoid potentially blocking while we are in a callback. + if (!dataNew.isEmpty()) { + try { + this.parent.append(new AppendContext(dataNew)); + } catch (DescriptorValidationException | IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + } + + boolean resendRequest = false; + if (throwable instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof Exceptions.StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new AppendContext(appendContext.data)); + } catch (Descriptors.DescriptorValidationException + | IOException + | InterruptedException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + Exceptions.StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } + } + } +} +// [END bigquerystorage_timestamp_jsonstreamwriter_default] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java new file mode 100644 index 0000000000..6797aea936 --- /dev/null +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrow.java @@ -0,0 +1,367 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +// [START bigquerystorage_timestamp_streamwriter_default_arrow] +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import java.io.IOException; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.threeten.bp.Duration; + +/** + * This class demonstrates how to ingest data using Arrow format into BigQuery via the default + * stream. It initiates a DataWriter to establish a connection to BigQuery and reuses this + * connection to continuously ingest data. + * + *

Depending on the JDK version, you may need to include this into your VM options: {@code + * --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED}. See the documentation for + * more information. + */ +public class WriteToDefaultStreamTimestampWithArrow { + + public static final long NANOS = 1000000000L; + + public static void main(String[] args) throws InterruptedException, IOException { + if (args.length < 3) { + System.out.println( + "Usage: WriteToDefaultStreamWithArrow "); + return; + } + String projectId = args[0]; + String datasetName = args[1]; + String tableName = args[2]; + // For this sample, the table schema should contain 3 fields: + // ['timestampField': TIMESTAMP] + writeToDefaultStreamWithArrow(projectId, datasetName, tableName); + } + + private static Schema createArrowSchema() { + List fields = + ImmutableList.of( + new Field( + "timestampField", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")), + null)); + return new Schema(fields, null); + } + + // Create an ArrowRecordBatch object that is compatible with the table schema. + private static ArrowRecordBatch buildRecordBatch(VectorSchemaRoot root, int rowCount) { + TimeStampNanoTZVector timestampField = (TimeStampNanoTZVector) root.getVector("timestampField"); + timestampField.allocateNew(rowCount); + + Instant now = Instant.now(); + for (int i = 0; i < rowCount; i++) { + timestampField.set(i, now.getEpochSecond() * NANOS + now.getNano()); + } + root.setRowCount(rowCount); + + CompressionCodec codec = + NoCompressionCodec.Factory.INSTANCE.createCodec(CompressionUtil.CodecType.NO_COMPRESSION); + VectorUnloader vectorUnloader = + new VectorUnloader(root, /* includeNullCount= */ true, codec, /* alignBuffers= */ true); + return vectorUnloader.getRecordBatch(); + } + + public static void writeToDefaultStreamWithArrow( + String projectId, String datasetName, String tableName) + throws InterruptedException, IOException { + TableName parentTable = TableName.of(projectId, datasetName, tableName); + Schema arrowSchema = createArrowSchema(); + DataWriter writer = new DataWriter(); + // One time initialization for the worker. + writer.initialize(parentTable, arrowSchema); + long initialRowCount = getRowCount(parentTable); + try (BufferAllocator allocator = new RootAllocator()) { + // A writer should be used to ingest as much data as possible before teardown. + // Append 100 batches. + for (int i = 0; i < 100; i++) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Each batch has 10 rows. + ArrowRecordBatch batch = buildRecordBatch(root, 10); + + // Asynchronous append. + writer.append(new ArrowData(arrowSchema, batch)); + } + } + } + // Final cleanup for the stream during worker teardown. + // It's blocked until all append requests' response are received. + writer.cleanup(); + + verifyExpectedRowCount(parentTable, initialRowCount + 1000); + System.out.println("Appended records successfully."); + } + + private static long getRowCount(TableName parentTable) throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = + BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); + TableResult results = bigquery.query(queryConfig); + return Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); + } + + private static void verifyExpectedRowCount(TableName parentTable, long expectedRowCount) + throws InterruptedException { + String queryRowCount = + "SELECT COUNT(*) FROM `" + + parentTable.getProject() + + "." + + parentTable.getDataset() + + "." + + parentTable.getTable() + + "`"; + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(queryRowCount).build(); + BigQuery bigquery = + BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); + TableResult results = bigquery.query(queryConfig); + int countRowsActual = + Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + if (countRowsActual != expectedRowCount) { + throw new RuntimeException( + "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); + } + } + + private static class ArrowData { + Schema arrowSchema; + ArrowRecordBatch data; + + ArrowData(Schema arrowSchema, ArrowRecordBatch data) { + this.arrowSchema = arrowSchema; + this.data = data; + } + } + + private static class DataWriter { + + private static final int MAX_RECREATE_COUNT = 3; + + private BigQueryWriteClient client; + + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + private final Object lock = new Object(); + + private Schema arrowSchema; + private StreamWriter streamWriter; + + @GuardedBy("lock") + private RuntimeException error = null; + + private final AtomicInteger recreateCount = new AtomicInteger(0); + + private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) + throws IOException { + // Configure in-stream automatic retry settings. + // Error codes that are immediately retried: + // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED + // Error codes that are retried with exponential backoff: + // * RESOURCE_EXHAUSTED + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + // Use the Stream writer to send records in Arrow format. Specify the table name to write + // to the default stream. + // For more information about StreamWriter, see: + // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter + return StreamWriter.newBuilder(streamName, client) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .build()) + .setEnableConnectionPool(true) + // If value is missing in ArrowRecordBatch and there is a default value configured on + // bigquery column, apply the default value to the missing value field. + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setMaxRetryDuration(java.time.Duration.ofSeconds(5)) + // Set the StreamWriter with Arrow Schema, this would only allow the StreamWriter to + // append data in Arrow format. + .setWriterSchema(arrowSchema) + .setRetrySettings(retrySettings) + .build(); + } + + public void initialize(TableName parentTable, Schema arrowSchema) throws IOException { + // Initialize client without settings, internally within stream writer a new client will be + // created with full settings. + client = BigQueryWriteClient.create(); + + streamWriter = createStreamWriter(parentTable.toString() + "/_default", arrowSchema); + } + + public void append(ArrowData arrowData) throws IOException { + synchronized (this.lock) { + if (!streamWriter.isUserClosed() + && streamWriter.isClosed() + && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) { + streamWriter = createStreamWriter(streamWriter.getStreamName(), arrowData.arrowSchema); + this.error = null; + } + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; + } + } + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(arrowData.data); + ApiFutures.addCallback( + future, new AppendCompleteCallback(this, arrowData), MoreExecutors.directExecutor()); + + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup() { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + client.close(); + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + private final ArrowData arrowData; + + public AppendCompleteCallback(DataWriter parent, ArrowData arrowData) { + this.parent = parent; + this.arrowData = arrowData; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.println("Append success"); + this.parent.recreateCount.set(0); + done(); + } + + public void onFailure(Throwable throwable) { + System.out.println("Append failed: " + throwable.toString()); + if (throwable instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError ase = (Exceptions.AppendSerializationError) throwable; + Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); + if (!rowIndexToErrorMessage.isEmpty()) { + System.out.println("row level errors: " + rowIndexToErrorMessage); + // The append returned failure with indices for faulty rows. + // Fix the faulty rows or remove them from the appended data and retry the append. + done(); + return; + } + } + + boolean resendRequest = false; + if (throwable instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) { + resendRequest = true; + } else if (throwable instanceof Exceptions.StreamWriterClosedException) { + if (!parent.streamWriter.isUserClosed()) { + resendRequest = true; + } + } + if (resendRequest) { + // Retry this request. + try { + this.parent.append(new ArrowData(arrowData.arrowSchema, arrowData.data)); + } catch (IOException e) { + throw new RuntimeException(e); + } + // Mark the existing attempt as done since we got a response for it + done(); + return; + } + + synchronized (this.parent.lock) { + if (this.parent.error == null) { + Exceptions.StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } + } + } +} +// [END bigquerystorage_timestamp_streamwriter_default_arrow] diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java index d0bc455a9a..5db06efb04 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStreamWithArrow.java @@ -129,17 +129,17 @@ public static void writeToDefaultStreamWithArrow( // One time initialization for the worker. writer.initialize(parentTable, arrowSchema); long initialRowCount = getRowCount(parentTable); - BufferAllocator allocator = new RootAllocator(); - - // A writer should be used to ingest as much data as possible before teardown. - // Append 100 batches. - for (int i = 0; i < 100; i++) { - try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { - // Each batch has 10 rows. - ArrowRecordBatch batch = buildRecordBatch(root, 10); - - // Asynchronous append. - writer.append(new ArrowData(arrowSchema, batch)); + try (BufferAllocator allocator = new RootAllocator()) { + // A writer should be used to ingest as much data as possible before teardown. + // Append 100 batches. + for (int i = 0; i < 100; i++) { + try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) { + // Each batch has 10 rows. + ArrowRecordBatch batch = buildRecordBatch(root, 10); + + // Asynchronous append. + writer.append(new ArrowData(arrowSchema, batch)); + } } } // Final cleanup for the stream during worker teardown. @@ -180,8 +180,8 @@ private static void verifyExpectedRowCount(TableName parentTable, long expectedR BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(parentTable.getProject()).build().getService(); TableResult results = bigquery.query(queryConfig); - int countRowsActual = - Integer.parseInt(results.getValues().iterator().next().get("f0_").getStringValue()); + long countRowsActual = + Long.parseLong(results.getValues().iterator().next().get("f0_").getStringValue()); if (countRowsActual != expectedRowCount) { throw new RuntimeException( "Unexpected row count. Expected: " + expectedRowCount + ". Actual: " + countRowsActual); @@ -217,7 +217,7 @@ private static class DataWriter { private final AtomicInteger recreateCount = new AtomicInteger(0); private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) - throws DescriptorValidationException, IOException, InterruptedException { + throws IOException { // Configure in-stream automatic retry settings. // Error codes that are immediately retried: // * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED @@ -236,7 +236,7 @@ private StreamWriter createStreamWriter(String streamName, Schema arrowSchema) // For more information about StreamWriter, see: // https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.StreamWriter return StreamWriter.newBuilder(streamName, client) - .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100))) + .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(10))) .setChannelProvider( BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1)) diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java new file mode 100644 index 0000000000..f8f428dd90 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampArrowIT.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadTimestampArrowIT { + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private PrintStream out; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testQuickstart() throws Exception { + ReadTimestampArrow.main(PROJECT_ID); + String got = bout.toString(); + // Ensure that `last_reported` column is in the output + assertThat(got).contains("last_reported"); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java new file mode 100644 index 0000000000..ae7420fab6 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/ReadTimestampAvroIT.java @@ -0,0 +1,54 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadTimestampAvroIT { + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void tearDown() { + System.setOut(null); + } + + @Test + public void testReadTimestampAvro() throws Exception { + ReadTimestampAvro.main(PROJECT_ID); + String got = bout.toString(); + // Ensure that `last_reported` column is in the output + assertThat(got).contains("last_reported"); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java index 6293b301a7..b9423ccbea 100644 --- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteNestedProtoIT.java @@ -106,4 +106,4 @@ public void testWriteNestedProto() throws Exception { WriteNestedProto.runWriteNestedProto(GOOGLE_CLOUD_PROJECT, datasetName, tableName); assertThat(bout.toString()).contains("Appended records successfully."); } -} +} \ No newline at end of file diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java new file mode 100644 index 0000000000..0e5e8bcf80 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampJsonIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamTimestampJsonIT { + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar() { + assertNotNull( + "Environment variable " + "GOOGLE_CLOUD_PROJECT" + " is required to perform these tests.", + System.getenv("GOOGLE_CLOUD_PROJECT")); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar(); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of(Field.newBuilder("timestampField", StandardSQLTypeName.TIMESTAMP).build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), + BigQuery.DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteToDefaultStreamTimestampJson.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +} diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java new file mode 100644 index 0000000000..2561de9859 --- /dev/null +++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamTimestampWithArrowIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.bigquerystorage; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WriteToDefaultStreamTimestampWithArrowIT { + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); + + private ByteArrayOutputStream bout; + private BigQuery bigquery; + private String datasetName; + private String tableName; + + private static void requireEnvVar() { + assertNotNull( + "Environment variable GOOGLE_CLOUD_PROJECT is required to perform these tests.", + System.getenv("GOOGLE_CLOUD_PROJECT")); + } + + @BeforeClass + public static void checkRequirements() { + requireEnvVar(); + } + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bout); + System.setOut(out); + + bigquery = BigQueryOptions.getDefaultInstance().getService(); + + // Create a new dataset and table for each test. + datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8); + Schema schema = + Schema.of(Field.newBuilder("timestampField", StandardSQLTypeName.TIMESTAMP).build()); + bigquery.create(DatasetInfo.newBuilder(datasetName).build()); + TableInfo tableInfo = + TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema)) + .build(); + bigquery.create(tableInfo); + } + + @After + public void tearDown() { + bigquery.delete( + DatasetId.of(GOOGLE_CLOUD_PROJECT, datasetName), + BigQuery.DatasetDeleteOption.deleteContents()); + System.setOut(null); + } + + @Test + public void testWriteToDefaultStream() throws Exception { + WriteToDefaultStreamTimestampJson.writeToDefaultStream( + GOOGLE_CLOUD_PROJECT, datasetName, tableName); + assertThat(bout.toString()).contains("Appended records successfully."); + } +}