|
1 | 1 | package org.testcontainers.containers; |
2 | 2 |
|
| 3 | +import com.google.api.core.ApiFuture; |
| 4 | +import com.google.api.gax.core.NoCredentialsProvider; |
| 5 | +import com.google.api.gax.grpc.GrpcTransportChannel; |
| 6 | +import com.google.api.gax.rpc.FixedTransportChannelProvider; |
3 | 7 | import com.google.cloud.NoCredentials; |
4 | 8 | import com.google.cloud.bigquery.BigQuery; |
5 | 9 | import com.google.cloud.bigquery.BigQueryOptions; |
| 10 | +import com.google.cloud.bigquery.DatasetId; |
| 11 | +import com.google.cloud.bigquery.DatasetInfo; |
| 12 | +import com.google.cloud.bigquery.Field; |
6 | 13 | import com.google.cloud.bigquery.QueryJobConfiguration; |
| 14 | +import com.google.cloud.bigquery.Schema; |
| 15 | +import com.google.cloud.bigquery.StandardSQLTypeName; |
| 16 | +import com.google.cloud.bigquery.StandardTableDefinition; |
| 17 | +import com.google.cloud.bigquery.TableDefinition; |
| 18 | +import com.google.cloud.bigquery.TableId; |
| 19 | +import com.google.cloud.bigquery.TableInfo; |
7 | 20 | import com.google.cloud.bigquery.TableResult; |
| 21 | +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; |
| 22 | +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; |
| 23 | +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; |
| 24 | +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; |
| 25 | +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; |
| 26 | +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; |
| 27 | +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; |
| 28 | +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; |
| 29 | +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; |
| 30 | +import com.google.cloud.bigquery.storage.v1.TableName; |
| 31 | +import com.google.cloud.bigquery.storage.v1.WriteStream; |
| 32 | +import io.grpc.ManagedChannelBuilder; |
| 33 | +import org.json.JSONArray; |
| 34 | +import org.json.JSONObject; |
8 | 35 | import org.junit.jupiter.api.Test; |
| 36 | +import org.threeten.bp.Duration; |
9 | 37 |
|
10 | 38 | import java.math.BigDecimal; |
11 | 39 | import java.util.List; |
|
16 | 44 | class BigQueryEmulatorContainerTest { |
17 | 45 |
|
18 | 46 | @Test |
19 | | - void test() throws Exception { |
| 47 | + void testHttpEndpoint() throws Exception { |
20 | 48 | try ( |
21 | 49 | // emulatorContainer { |
22 | 50 | BigQueryEmulatorContainer container = new BigQueryEmulatorContainer("ghcr.io/goccy/bigquery-emulator:0.4.3") |
@@ -51,4 +79,126 @@ void test() throws Exception { |
51 | 79 | assertThat(values).containsOnly(BigDecimal.valueOf(30)); |
52 | 80 | } |
53 | 81 | } |
| 82 | + |
| 83 | + @Test |
| 84 | + void testGrcpEndpoint() throws Exception { |
| 85 | + try ( |
| 86 | + BigQueryEmulatorContainer container = new BigQueryEmulatorContainer("ghcr.io/goccy/bigquery-emulator:0.6.5") |
| 87 | + ) { |
| 88 | + container.start(); |
| 89 | + |
| 90 | + BigQuery bigQuery = getBigQuery(container); |
| 91 | + String tableName = "test-table"; |
| 92 | + String datasetName = "test-dataset"; |
| 93 | + |
| 94 | + bigQuery.create(DatasetInfo.of(DatasetId.of(container.getProjectId(), datasetName))); |
| 95 | + |
| 96 | + Schema schema = Schema.of(Field.of("name", StandardSQLTypeName.STRING)); |
| 97 | + |
| 98 | + TableId tableId = TableId.of(datasetName, tableName); |
| 99 | + TableDefinition tableDefinition = StandardTableDefinition.of(schema); |
| 100 | + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); |
| 101 | + |
| 102 | + bigQuery.create(tableInfo); |
| 103 | + |
| 104 | + BigQueryWriteSettings.Builder bigQueryWriteSettingsBuilder = BigQueryWriteSettings.newBuilder(); |
| 105 | + |
| 106 | + bigQueryWriteSettingsBuilder |
| 107 | + .createWriteStreamSettings() |
| 108 | + .setRetrySettings( |
| 109 | + bigQueryWriteSettingsBuilder |
| 110 | + .createWriteStreamSettings() |
| 111 | + .getRetrySettings() |
| 112 | + .toBuilder() |
| 113 | + .setTotalTimeout(Duration.ofSeconds(60)) |
| 114 | + .build() |
| 115 | + ); |
| 116 | + |
| 117 | + BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create( |
| 118 | + bigQueryWriteSettingsBuilder |
| 119 | + .setTransportChannelProvider( |
| 120 | + FixedTransportChannelProvider.create( |
| 121 | + GrpcTransportChannel.create( |
| 122 | + ManagedChannelBuilder |
| 123 | + .forAddress(container.getHost(), container.getEmulatorGrpcPort()) |
| 124 | + .usePlaintext() |
| 125 | + .build() |
| 126 | + ) |
| 127 | + ) |
| 128 | + ) |
| 129 | + .setCredentialsProvider(NoCredentialsProvider.create()) |
| 130 | + .build() |
| 131 | + ); |
| 132 | + |
| 133 | + TableName parentTable = TableName.of(container.getProjectId(), datasetName, tableName); |
| 134 | + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest |
| 135 | + .newBuilder() |
| 136 | + .setParent(parentTable.toString()) |
| 137 | + .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING)) |
| 138 | + .build(); |
| 139 | + |
| 140 | + WriteStream writeStream = bigQueryWriteClient.createWriteStream(createWriteStreamRequest); |
| 141 | + |
| 142 | + JsonStreamWriter writer = JsonStreamWriter |
| 143 | + .newBuilder(writeStream.getName(), writeStream.getTableSchema(), bigQueryWriteClient) |
| 144 | + .build(); |
| 145 | + |
| 146 | + JSONArray jsonArray = new JSONArray(); |
| 147 | + JSONObject record1 = new JSONObject(); |
| 148 | + record1.put("name", "Alice"); |
| 149 | + jsonArray.put(record1); |
| 150 | + |
| 151 | + JSONObject record2 = new JSONObject(); |
| 152 | + record2.put("name", "Bob"); |
| 153 | + jsonArray.put(record2); |
| 154 | + |
| 155 | + ApiFuture<AppendRowsResponse> future = writer.append(jsonArray); |
| 156 | + AppendRowsResponse response = future.get(); |
| 157 | + |
| 158 | + FinalizeWriteStreamRequest finalizeRequest = FinalizeWriteStreamRequest |
| 159 | + .newBuilder() |
| 160 | + .setName(writeStream.getName()) |
| 161 | + .build(); |
| 162 | + FinalizeWriteStreamResponse finalizeResponse = bigQueryWriteClient.finalizeWriteStream(finalizeRequest); |
| 163 | + |
| 164 | + BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest |
| 165 | + .newBuilder() |
| 166 | + .setParent(parentTable.toString()) |
| 167 | + .addWriteStreams(writeStream.getName()) |
| 168 | + .build(); |
| 169 | + BatchCommitWriteStreamsResponse commitResponse = bigQueryWriteClient.batchCommitWriteStreams(commitRequest); |
| 170 | + |
| 171 | + writer.close(); |
| 172 | + |
| 173 | + String sql = String.format( |
| 174 | + "SELECT name FROM `%s.%s.%s` ORDER BY name", |
| 175 | + container.getProjectId(), |
| 176 | + datasetName, |
| 177 | + tableName |
| 178 | + ); |
| 179 | + TableResult result = bigQuery.query(QueryJobConfiguration.newBuilder(sql).build()); |
| 180 | + |
| 181 | + List<String> names = result |
| 182 | + .streamValues() |
| 183 | + .map(row -> row.get("name").getStringValue()) |
| 184 | + .collect(Collectors.toList()); |
| 185 | + |
| 186 | + assertThat(names).containsExactly("Alice", "Bob"); |
| 187 | + |
| 188 | + bigQueryWriteClient.shutdown(); |
| 189 | + bigQueryWriteClient.close(); |
| 190 | + } |
| 191 | + } |
| 192 | + |
| 193 | + private BigQuery getBigQuery(BigQueryEmulatorContainer container) { |
| 194 | + String url = container.getEmulatorHttpEndpoint(); |
| 195 | + return BigQueryOptions |
| 196 | + .newBuilder() |
| 197 | + .setProjectId(container.getProjectId()) |
| 198 | + .setHost(url) |
| 199 | + .setLocation(url) |
| 200 | + .setCredentials(NoCredentials.getInstance()) |
| 201 | + .build() |
| 202 | + .getService(); |
| 203 | + } |
54 | 204 | } |
0 commit comments