27
27
import static org .mockito .Mockito .times ;
28
28
import static org .mockito .Mockito .verify ;
29
29
import static org .mockito .Mockito .when ;
30
+ import static org .mockito .Mockito .never ;
30
31
31
32
import com .google .cloud .bigquery .BigQuery ;
32
33
import com .google .cloud .bigquery .BigQueryError ;
33
34
import com .google .cloud .bigquery .BigQueryException ;
34
35
import com .google .cloud .bigquery .InsertAllRequest ;
35
36
import com .google .cloud .bigquery .InsertAllResponse ;
36
37
import com .google .cloud .bigquery .TableId ;
38
+ import com .google .cloud .bigquery .Table ;
37
39
import com .google .cloud .storage .Storage ;
38
40
39
41
import com .wepay .kafka .connect .bigquery .api .SchemaRetriever ;
@@ -85,7 +87,7 @@ public void testSimplePut() {
85
87
when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
86
88
when (insertAllResponse .hasErrors ()).thenReturn (false );
87
89
88
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
90
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
89
91
testTask .initialize (sinkTaskContext );
90
92
testTask .start (properties );
91
93
@@ -112,7 +114,7 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() {
112
114
113
115
SchemaRetriever schemaRetriever = mock (SchemaRetriever .class );
114
116
115
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage );
117
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage , null );
116
118
testTask .initialize (sinkTaskContext );
117
119
testTask .start (properties );
118
120
@@ -123,13 +125,91 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() {
123
125
any (String .class ), any (Schema .class ));
124
126
}
125
127
128
+ @ Test
129
+ public void testAutoCreateTables () {
130
+ final String dataset = "scratch" ;
131
+ final String existingTableTopic = "topic-with-existing-table" ;
132
+ final String nonExistingTableTopic = "topic-without-existing-table" ;
133
+ final TableId existingTable = TableId .of (dataset , "topic_with_existing_table" );
134
+ final TableId nonExistingTable = TableId .of (dataset , "topic_without_existing_table" );
135
+
136
+ Map <String , String > properties = propertiesFactory .getProperties ();
137
+ properties .put (BigQuerySinkConfig .TABLE_CREATE_CONFIG , "true" );
138
+ properties .put (BigQuerySinkConfig .SCHEMA_RETRIEVER_CONFIG , BigQuerySinkConnectorTest .MockSchemaRetriever .class .getName ());
139
+ properties .put (BigQuerySinkConfig .SANITIZE_TOPICS_CONFIG , "true" );
140
+ properties .put (BigQuerySinkConfig .DATASETS_CONFIG , String .format (".*=%s" , dataset ));
141
+ properties .put (BigQuerySinkConfig .TOPICS_CONFIG , existingTableTopic );
142
+
143
+ BigQuery bigQuery = mock (BigQuery .class );
144
+ Table fakeTable = mock (Table .class );
145
+ when (bigQuery .getTable (existingTable )).thenReturn (fakeTable );
146
+ when (bigQuery .getTable (nonExistingTable )).thenReturn (null );
147
+ InsertAllResponse insertAllResponse = mock (InsertAllResponse .class );
148
+ when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
149
+ when (insertAllResponse .hasErrors ()).thenReturn (false );
150
+
151
+ Storage storage = mock (Storage .class );
152
+ SchemaRetriever schemaRetriever = mock (SchemaRetriever .class );
153
+ SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
154
+ SchemaManager schemaManager = mock (SchemaManager .class );
155
+
156
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage , schemaManager );
157
+ testTask .initialize (sinkTaskContext );
158
+ testTask .start (properties );
159
+
160
+ testTask .put (Collections .singletonList (spoofSinkRecord (nonExistingTableTopic )));
161
+ testTask .flush (Collections .emptyMap ());
162
+
163
+ verify (schemaManager , never ()).createTable (existingTable , existingTableTopic );
164
+ verify (schemaManager ).createTable (nonExistingTable , nonExistingTableTopic );
165
+ }
166
+
167
+ @ Test
168
+ public void testNonAutoCreateTables () {
169
+ final String dataset = "scratch" ;
170
+ final String existingTableTopic = "topic-with-existing-table" ;
171
+ final String nonExistingTableTopic = "topic-without-existing-table" ;
172
+ final TableId existingTable = TableId .of (dataset , "topic_with_existing_table" );
173
+ final TableId nonExistingTable = TableId .of (dataset , "topic_without_existing_table" );
174
+
175
+ Map <String , String > properties = propertiesFactory .getProperties ();
176
+ properties .put (BigQuerySinkConfig .SCHEMA_RETRIEVER_CONFIG , BigQuerySinkConnectorTest .MockSchemaRetriever .class .getName ());
177
+ properties .put (BigQuerySinkConfig .SANITIZE_TOPICS_CONFIG , "true" );
178
+ properties .put (BigQuerySinkConfig .DATASETS_CONFIG , String .format (".*=%s" , dataset ));
179
+ properties .put (BigQuerySinkConfig .TOPICS_CONFIG , existingTableTopic );
180
+
181
+ BigQuery bigQuery = mock (BigQuery .class );
182
+ Table fakeTable = mock (Table .class );
183
+ when (bigQuery .getTable (existingTable )).thenReturn (fakeTable );
184
+ when (bigQuery .getTable (nonExistingTable )).thenReturn (null );
185
+ InsertAllResponse insertAllResponse = mock (InsertAllResponse .class );
186
+ when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
187
+ when (insertAllResponse .hasErrors ()).thenReturn (false );
188
+
189
+ Storage storage = mock (Storage .class );
190
+ SchemaRetriever schemaRetriever = mock (SchemaRetriever .class );
191
+ SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
192
+ SchemaManager schemaManager = mock (SchemaManager .class );
193
+
194
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , schemaRetriever , storage , schemaManager );
195
+ testTask .initialize (sinkTaskContext );
196
+ testTask .start (properties );
197
+
198
+ testTask .put (Collections .singletonList (spoofSinkRecord (nonExistingTableTopic )));
199
+ testTask .flush (Collections .emptyMap ());
200
+
201
+ verify (schemaManager , never ()).createTable (existingTable , existingTableTopic );
202
+ verify (schemaManager , never ()).createTable (nonExistingTable , existingTableTopic );
203
+ }
204
+
205
+
126
206
@ Test
127
207
public void testEmptyPut () {
128
208
Map <String , String > properties = propertiesFactory .getProperties ();
129
209
BigQuery bigQuery = mock (BigQuery .class );
130
210
Storage storage = mock (Storage .class );
131
211
132
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
212
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
133
213
testTask .start (properties );
134
214
135
215
testTask .put (Collections .emptyList ());
@@ -148,7 +228,7 @@ public void testEmptyRecordPut() {
148
228
BigQuery bigQuery = mock (BigQuery .class );
149
229
Storage storage = mock (Storage .class );
150
230
151
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
231
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
152
232
testTask .start (properties );
153
233
154
234
SinkRecord emptyRecord = spoofSinkRecord (topic , simpleSchema , null );
@@ -175,7 +255,7 @@ public void testPutWhenPartitioningOnMessageTime() {
175
255
when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
176
256
when (insertAllResponse .hasErrors ()).thenReturn (false );
177
257
178
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
258
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
179
259
testTask .initialize (sinkTaskContext );
180
260
testTask .start (properties );
181
261
@@ -206,7 +286,7 @@ public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() {
206
286
when (bigQuery .insertAll (anyObject ())).thenReturn (insertAllResponse );
207
287
when (insertAllResponse .hasErrors ()).thenReturn (false );
208
288
209
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
289
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
210
290
testTask .initialize (sinkTaskContext );
211
291
testTask .start (properties );
212
292
@@ -232,7 +312,7 @@ public void testBufferClearOnFlushError() {
232
312
.thenThrow (new RuntimeException ("This is a test" ));
233
313
234
314
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
235
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
315
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
236
316
testTask .initialize (sinkTaskContext );
237
317
testTask .start (properties );
238
318
@@ -253,7 +333,7 @@ public void testEmptyFlush() {
253
333
Storage storage = mock (Storage .class );
254
334
255
335
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
256
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
336
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
257
337
testTask .initialize (sinkTaskContext );
258
338
testTask .start (properties );
259
339
@@ -284,7 +364,7 @@ public void testBigQuery5XXRetry() {
284
364
285
365
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
286
366
287
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
367
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
288
368
testTask .initialize (sinkTaskContext );
289
369
testTask .start (properties );
290
370
testTask .put (Collections .singletonList (spoofSinkRecord (topic )));
@@ -318,7 +398,7 @@ public void testBigQuery403Retry() {
318
398
319
399
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
320
400
321
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
401
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
322
402
testTask .initialize (sinkTaskContext );
323
403
testTask .start (properties );
324
404
testTask .put (Collections .singletonList (spoofSinkRecord (topic )));
@@ -349,7 +429,7 @@ public void testBigQueryRetryExceeded() {
349
429
350
430
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
351
431
352
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
432
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
353
433
testTask .initialize (sinkTaskContext );
354
434
testTask .start (properties );
355
435
testTask .put (Collections .singletonList (spoofSinkRecord (topic )));
@@ -374,7 +454,7 @@ public void testInterruptedException() {
374
454
when (bigQuery .insertAll (any (InsertAllRequest .class ))).thenReturn (fakeResponse );
375
455
376
456
SinkTaskContext sinkTaskContext = mock (SinkTaskContext .class );
377
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
457
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
378
458
testTask .initialize (sinkTaskContext );
379
459
testTask .start (properties );
380
460
@@ -393,7 +473,7 @@ public void testConfigException() {
393
473
badProperties .remove (BigQuerySinkConfig .TOPICS_CONFIG );
394
474
395
475
BigQuerySinkTask testTask =
396
- new BigQuerySinkTask (mock (BigQuery .class ), null , mock (Storage .class ));
476
+ new BigQuerySinkTask (mock (BigQuery .class ), null , mock (Storage .class ), null );
397
477
testTask .start (badProperties );
398
478
}
399
479
@@ -421,7 +501,7 @@ public void testStop() {
421
501
when (insertAllResponse .hasErrors ()).thenReturn (false );
422
502
423
503
Storage storage = mock (Storage .class );
424
- BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage );
504
+ BigQuerySinkTask testTask = new BigQuerySinkTask (bigQuery , null , storage , null );
425
505
testTask .initialize (sinkTaskContext );
426
506
testTask .start (properties );
427
507
testTask .put (Collections .singletonList (spoofSinkRecord (topic )));
0 commit comments