Skip to content

Commit d855548

Browse files
author
Bingqin Zhou
committed
Add unit tests.
1 parent 0a21bd5 commit d855548

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ project(':kcbq-connector') {
217217
"junit:junit:$junitVersion",
218218
"org.mockito:mockito-core:$mockitoVersion"
219219
)
220+
221+
testImplementation 'org.mockito:mockito-inline:2.13.0'
220222
}
221223

222224
artifacts {

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727

2828
import com.google.cloud.bigquery.BigQuery;
2929
import com.google.cloud.bigquery.BigQueryError;
30+
import com.google.cloud.bigquery.BigQueryException;
3031
import com.google.cloud.bigquery.InsertAllRequest;
3132
import com.google.cloud.bigquery.InsertAllResponse;
3233
import com.google.cloud.storage.Storage;
3334

3435
import com.wepay.kafka.connect.bigquery.BigQuerySinkTask;
36+
import com.wepay.kafka.connect.bigquery.SchemaManager;
3537
import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory;
3638
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3739
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
@@ -95,6 +97,75 @@ public void testBigQueryNoFailure() {
9597
verify(bigQuery, times(1)).insertAll(anyObject());
9698
}
9799

100+
@Test
101+
public void testAutoCreateTables() {
102+
final String topic = "test_topic";
103+
final String dataset = "scratch";
104+
final Map<String, String> properties = makeProperties("3", "2000", topic, dataset);
105+
properties.put(BigQuerySinkTaskConfig.TABLE_CREATE_CONFIG, "true");
106+
107+
BigQuery bigQuery = mock(BigQuery.class);
108+
Map<Long, List<BigQueryError>> emptyMap = mock(Map.class);
109+
when(emptyMap.isEmpty()).thenReturn(true);
110+
111+
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
112+
when(insertAllResponse.hasErrors()).thenReturn(false);
113+
when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap);
114+
115+
BigQueryException missTableException = mock(BigQueryException.class);
116+
when(missTableException.getReason()).thenReturn("notFound");
117+
118+
when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse);
119+
120+
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
121+
122+
Storage storage = mock(Storage.class);
123+
SchemaManager schemaManager = mock(SchemaManager.class);
124+
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager);
125+
testTask.initialize(sinkTaskContext);
126+
testTask.start(properties);
127+
testTask.put(
128+
Collections.singletonList(spoofSinkRecord(topic, 0, 0, "some_field", "some_value")));
129+
testTask.flush(Collections.emptyMap());
130+
131+
verify(schemaManager, times(1)).createTable(anyObject(), anyObject());
132+
verify(bigQuery, times(2)).insertAll(anyObject());
133+
}
134+
135+
@Test
136+
public void testNonAutoCreateTables() {
137+
final String topic = "test_topic";
138+
final String dataset = "scratch";
139+
final Map<String, String> properties = makeProperties("3", "2000", topic, dataset);
140+
141+
BigQuery bigQuery = mock(BigQuery.class);
142+
143+
Map<Long, List<BigQueryError>> emptyMap = mock(Map.class);
144+
when(emptyMap.isEmpty()).thenReturn(true);
145+
InsertAllResponse insertAllResponse = mock(InsertAllResponse.class);
146+
when(insertAllResponse.hasErrors()).thenReturn(false);
147+
when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap);
148+
149+
BigQueryException missTableException = mock(BigQueryException.class);
150+
when(missTableException.getReason()).thenReturn("notFound");
151+
152+
when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse);
153+
154+
SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class);
155+
156+
Storage storage = mock(Storage.class);
157+
SchemaManager schemaManager = mock(SchemaManager.class);
158+
BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager);
159+
testTask.initialize(sinkTaskContext);
160+
testTask.start(properties);
161+
testTask.put(
162+
Collections.singletonList(spoofSinkRecord(topic, 0, 0, "some_field", "some_value")));
163+
testTask.flush(Collections.emptyMap());
164+
165+
verify(schemaManager, times(0)).createTable(anyObject(), anyObject());
166+
verify(bigQuery, times(2)).insertAll(anyObject());
167+
}
168+
98169
@Test
99170
public void testBigQueryPartialFailure() {
100171
final String topic = "test_topic";

0 commit comments

Comments
 (0)