diff --git a/modules/gcloud/src/main/java/org/testcontainers/containers/BigQueryEmulatorContainer.java b/modules/gcloud/src/main/java/org/testcontainers/containers/BigQueryEmulatorContainer.java index 6c305a50537..6831785b0a5 100644 --- a/modules/gcloud/src/main/java/org/testcontainers/containers/BigQueryEmulatorContainer.java +++ b/modules/gcloud/src/main/java/org/testcontainers/containers/BigQueryEmulatorContainer.java @@ -33,6 +33,10 @@ public String getEmulatorHttpEndpoint() { return String.format("http://%s:%d", getHost(), getMappedPort(HTTP_PORT)); } + public Integer getEmulatorGrpcPort() { + return getMappedPort(GRPC_PORT); + } + public String getProjectId() { return PROJECT_ID; } diff --git a/modules/gcloud/src/test/java/org/testcontainers/containers/BigQueryEmulatorContainerTest.java b/modules/gcloud/src/test/java/org/testcontainers/containers/BigQueryEmulatorContainerTest.java index 7f06a67c26c..0337cd563b9 100644 --- a/modules/gcloud/src/test/java/org/testcontainers/containers/BigQueryEmulatorContainerTest.java +++ b/modules/gcloud/src/test/java/org/testcontainers/containers/BigQueryEmulatorContainerTest.java @@ -1,11 +1,39 @@ package org.testcontainers.containers; +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.cloud.NoCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import io.grpc.ManagedChannelBuilder; +import org.json.JSONArray; +import org.json.JSONObject; import org.junit.jupiter.api.Test; +import org.threeten.bp.Duration; import java.math.BigDecimal; import java.util.List; @@ -16,7 +44,7 @@ class BigQueryEmulatorContainerTest { @Test - void test() throws Exception { + void testHttpEndpoint() throws Exception { try ( // emulatorContainer { BigQueryEmulatorContainer container = new BigQueryEmulatorContainer("ghcr.io/goccy/bigquery-emulator:0.4.3") @@ -51,4 +79,126 @@ void test() throws Exception { assertThat(values).containsOnly(BigDecimal.valueOf(30)); } } + + @Test + void testGrcpEndpoint() throws Exception { + try ( + BigQueryEmulatorContainer container = new BigQueryEmulatorContainer("ghcr.io/goccy/bigquery-emulator:0.6.5") + ) { + container.start(); + + BigQuery bigQuery = getBigQuery(container); + String tableName = "test-table"; + String datasetName = "test-dataset"; + + bigQuery.create(DatasetInfo.of(DatasetId.of(container.getProjectId(), datasetName))); + + Schema schema = Schema.of(Field.of("name", StandardSQLTypeName.STRING)); + + TableId tableId = TableId.of(datasetName, tableName); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + bigQuery.create(tableInfo); + + BigQueryWriteSettings.Builder bigQueryWriteSettingsBuilder = BigQueryWriteSettings.newBuilder(); + + bigQueryWriteSettingsBuilder + .createWriteStreamSettings() + .setRetrySettings( + bigQueryWriteSettingsBuilder + .createWriteStreamSettings() + .getRetrySettings() + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(60)) + .build() + ); + + BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create( + bigQueryWriteSettingsBuilder + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create( + ManagedChannelBuilder + .forAddress(container.getHost(), container.getEmulatorGrpcPort()) + .usePlaintext() + .build() + ) + ) + ) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build() + ); + + TableName parentTable = TableName.of(container.getProjectId(), datasetName, tableName); + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest + .newBuilder() + .setParent(parentTable.toString()) + .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING)) + .build(); + + WriteStream writeStream = bigQueryWriteClient.createWriteStream(createWriteStreamRequest); + + JsonStreamWriter writer = JsonStreamWriter + .newBuilder(writeStream.getName(), writeStream.getTableSchema(), bigQueryWriteClient) + .build(); + + JSONArray jsonArray = new JSONArray(); + JSONObject record1 = new JSONObject(); + record1.put("name", "Alice"); + jsonArray.put(record1); + + JSONObject record2 = new JSONObject(); + record2.put("name", "Bob"); + jsonArray.put(record2); + + ApiFuture future = writer.append(jsonArray); + AppendRowsResponse response = future.get(); + + FinalizeWriteStreamRequest finalizeRequest = FinalizeWriteStreamRequest + .newBuilder() + .setName(writeStream.getName()) + .build(); + FinalizeWriteStreamResponse finalizeResponse = bigQueryWriteClient.finalizeWriteStream(finalizeRequest); + + BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest + .newBuilder() + .setParent(parentTable.toString()) + .addWriteStreams(writeStream.getName()) + .build(); + BatchCommitWriteStreamsResponse commitResponse = bigQueryWriteClient.batchCommitWriteStreams(commitRequest); + + writer.close(); + + String sql = String.format( + "SELECT name FROM `%s.%s.%s` ORDER BY name", + container.getProjectId(), + datasetName, + tableName + ); + TableResult result = bigQuery.query(QueryJobConfiguration.newBuilder(sql).build()); + + List names = result + .streamValues() + .map(row -> row.get("name").getStringValue()) + .collect(Collectors.toList()); + + assertThat(names).containsExactly("Alice", "Bob"); + + bigQueryWriteClient.shutdown(); + bigQueryWriteClient.close(); + } + } + + private BigQuery getBigQuery(BigQueryEmulatorContainer container) { + String url = container.getEmulatorHttpEndpoint(); + return BigQueryOptions + .newBuilder() + .setProjectId(container.getProjectId()) + .setHost(url) + .setLocation(url) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService(); + } }