11package org .testcontainers .containers ;
22
3+ import com .google .api .core .ApiFuture ;
34import com .google .api .gax .core .NoCredentialsProvider ;
45import com .google .api .gax .grpc .GrpcTransportChannel ;
56import com .google .api .gax .rpc .FixedTransportChannelProvider ;
1718import com .google .cloud .bigquery .TableId ;
1819import com .google .cloud .bigquery .TableInfo ;
1920import com .google .cloud .bigquery .TableResult ;
21+ import com .google .cloud .bigquery .storage .v1 .AppendRowsResponse ;
22+ import com .google .cloud .bigquery .storage .v1 .BatchCommitWriteStreamsRequest ;
23+ import com .google .cloud .bigquery .storage .v1 .BatchCommitWriteStreamsResponse ;
2024import com .google .cloud .bigquery .storage .v1 .BigQueryWriteClient ;
2125import com .google .cloud .bigquery .storage .v1 .BigQueryWriteSettings ;
2226import com .google .cloud .bigquery .storage .v1 .CreateWriteStreamRequest ;
27+ import com .google .cloud .bigquery .storage .v1 .FinalizeWriteStreamRequest ;
28+ import com .google .cloud .bigquery .storage .v1 .FinalizeWriteStreamResponse ;
29+ import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
2330import com .google .cloud .bigquery .storage .v1 .TableName ;
2431import com .google .cloud .bigquery .storage .v1 .WriteStream ;
2532import io .grpc .ManagedChannelBuilder ;
26- import org .threeten .bp .Duration ;
33+ import org .json .JSONArray ;
34+ import org .json .JSONObject ;
2735import org .junit .jupiter .api .Test ;
36+ import org .threeten .bp .Duration ;
2837
29- import java .io .IOException ;
3038import java .math .BigDecimal ;
3139import java .util .List ;
3240import java .util .stream .Collectors ;
3543
3644class BigQueryEmulatorContainerTest {
3745
38- private BigQuery getBigQuery (BigQueryEmulatorContainer container ) {
39- String url = container .getEmulatorHttpEndpoint ();
40- return BigQueryOptions
41- .newBuilder ()
42- .setProjectId (container .getProjectId ())
43- .setHost (url )
44- .setLocation (url )
45- .setCredentials (NoCredentials .getInstance ())
46- .build ().getService ();
47- }
48-
4946 @ Test
5047 void testHttpEndpoint () throws Exception {
5148 try (
@@ -56,7 +53,15 @@ void testHttpEndpoint() throws Exception {
5653 container .start ();
5754
5855 // bigQueryClient {
59- BigQuery bigQuery = getBigQuery (container );
56+ String url = container .getEmulatorHttpEndpoint ();
57+ BigQueryOptions options = BigQueryOptions
58+ .newBuilder ()
59+ .setProjectId (container .getProjectId ())
60+ .setHost (url )
61+ .setLocation (url )
62+ .setCredentials (NoCredentials .getInstance ())
63+ .build ();
64+ BigQuery bigQuery = options .getService ();
6065 // }
6166
6267 String fn =
@@ -76,59 +81,124 @@ void testHttpEndpoint() throws Exception {
7681 }
7782
7883 @ Test
79- void testGrcpEndpoint () throws IOException {
80- try (BigQueryEmulatorContainer container = new BigQueryEmulatorContainer ("ghcr.io/goccy/bigquery-emulator:0.6.5" )) {
84+ void testGrcpEndpoint () throws Exception {
85+ try (
86+ BigQueryEmulatorContainer container = new BigQueryEmulatorContainer ("ghcr.io/goccy/bigquery-emulator:0.6.5" )
87+ ) {
8188 container .start ();
8289
83- // Test setup.
84- // Create a table the "regular" way. We need this to verify we can connect a writestream
85- BigQuery bigQuery = getBigQuery (container );
90+ BigQuery bigQuery = getBigQuery (container );
8691 String tableName = "test-table" ;
8792 String datasetName = "test-dataset" ;
8893
8994 bigQuery .create (DatasetInfo .of (DatasetId .of (container .getProjectId (), datasetName )));
9095
91- Schema schema = Schema .of (
92- Field .of ("name" , StandardSQLTypeName .STRING )
93- );
96+ Schema schema = Schema .of (Field .of ("name" , StandardSQLTypeName .STRING ));
9497
9598 TableId tableId = TableId .of (datasetName , tableName );
9699 TableDefinition tableDefinition = StandardTableDefinition .of (schema );
97100 TableInfo tableInfo = TableInfo .newBuilder (tableId , tableDefinition ).build ();
98101
99102 bigQuery .create (tableInfo );
100103
101- // Actual test.
102- // BigQueryWriteSettings requires a HTTP/2 connection, not provided by the originally exposed endpoint.
103104 BigQueryWriteSettings .Builder bigQueryWriteSettingsBuilder = BigQueryWriteSettings .newBuilder ();
104105
105- bigQueryWriteSettingsBuilder .createWriteStreamSettings ()
106- .setRetrySettings (bigQueryWriteSettingsBuilder .createWriteStreamSettings ()
107- .getRetrySettings ()
108- .toBuilder ()
109- .setTotalTimeout (Duration .ofSeconds (60 ))
110- .build ());
106+ bigQueryWriteSettingsBuilder
107+ .createWriteStreamSettings ()
108+ .setRetrySettings (
109+ bigQueryWriteSettingsBuilder
110+ .createWriteStreamSettings ()
111+ .getRetrySettings ()
112+ .toBuilder ()
113+ .setTotalTimeout (Duration .ofSeconds (60 ))
114+ .build ()
115+ );
111116
112- // Use the now exposed grpcPort to get a working connection.
113117 BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient .create (
114- bigQueryWriteSettingsBuilder .setTransportChannelProvider (FixedTransportChannelProvider .create (GrpcTransportChannel .create (
115- ManagedChannelBuilder .forAddress (container .getHost (), container .getEmulatorGrpcPort ()).usePlaintext ().build ())))
118+ bigQueryWriteSettingsBuilder
119+ .setTransportChannelProvider (
120+ FixedTransportChannelProvider .create (
121+ GrpcTransportChannel .create (
122+ ManagedChannelBuilder
123+ .forAddress (container .getHost (), container .getEmulatorGrpcPort ())
124+ .usePlaintext ()
125+ .build ()
126+ )
127+ )
128+ )
116129 .setCredentialsProvider (NoCredentialsProvider .create ())
117130 .build ()
118131 );
119132
120133 TableName parentTable = TableName .of (container .getProjectId (), datasetName , tableName );
121- CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest .newBuilder ()
134+ CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest
135+ .newBuilder ()
122136 .setParent (parentTable .toString ())
123137 .setWriteStream (WriteStream .newBuilder ().setType (WriteStream .Type .PENDING ))
124138 .build ();
125139
126- // Validate that we can successfully create a write stream. This would not work with http endpoint
127- bigQueryWriteClient .createWriteStream (createWriteStreamRequest );
140+ WriteStream writeStream = bigQueryWriteClient .createWriteStream (createWriteStreamRequest );
141+
142+ JsonStreamWriter writer = JsonStreamWriter
143+ .newBuilder (writeStream .getName (), writeStream .getTableSchema (), bigQueryWriteClient )
144+ .build ();
145+
146+ JSONArray jsonArray = new JSONArray ();
147+ JSONObject record1 = new JSONObject ();
148+ record1 .put ("name" , "Alice" );
149+ jsonArray .put (record1 );
150+
151+ JSONObject record2 = new JSONObject ();
152+ record2 .put ("name" , "Bob" );
153+ jsonArray .put (record2 );
154+
155+ ApiFuture <AppendRowsResponse > future = writer .append (jsonArray );
156+ AppendRowsResponse response = future .get ();
157+
158+ FinalizeWriteStreamRequest finalizeRequest = FinalizeWriteStreamRequest
159+ .newBuilder ()
160+ .setName (writeStream .getName ())
161+ .build ();
162+ FinalizeWriteStreamResponse finalizeResponse = bigQueryWriteClient .finalizeWriteStream (finalizeRequest );
163+
164+ BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest
165+ .newBuilder ()
166+ .setParent (parentTable .toString ())
167+ .addWriteStreams (writeStream .getName ())
168+ .build ();
169+ BatchCommitWriteStreamsResponse commitResponse = bigQueryWriteClient .batchCommitWriteStreams (commitRequest );
170+
171+ writer .close ();
172+
173+ String sql = String .format (
174+ "SELECT name FROM `%s.%s.%s` ORDER BY name" ,
175+ container .getProjectId (),
176+ datasetName ,
177+ tableName
178+ );
179+ TableResult result = bigQuery .query (QueryJobConfiguration .newBuilder (sql ).build ());
180+
181+ List <String > names = result
182+ .streamValues ()
183+ .map (row -> row .get ("name" ).getStringValue ())
184+ .collect (Collectors .toList ());
185+
186+ assertThat (names ).containsExactly ("Alice" , "Bob" );
128187
129188 bigQueryWriteClient .shutdown ();
130189 bigQueryWriteClient .close ();
131190 }
132191 }
133192
193+ private BigQuery getBigQuery (BigQueryEmulatorContainer container ) {
194+ String url = container .getEmulatorHttpEndpoint ();
195+ return BigQueryOptions
196+ .newBuilder ()
197+ .setProjectId (container .getProjectId ())
198+ .setHost (url )
199+ .setLocation (url )
200+ .setCredentials (NoCredentials .getInstance ())
201+ .build ()
202+ .getService ();
203+ }
134204}
0 commit comments