Skip to content

Commit 49344e2

Browse files
committed
To handle test cases properly for topics.regex
1 parent 0397334 commit 49344e2

File tree

3 files changed

+56
-16
lines changed

3 files changed

+56
-16
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ public class BigQuerySinkConfig extends AbstractConfig {
7676
private static final int TOPICS_REGEX_ORDER_IN_GROUP = 4;
7777
private static final ConfigDef.Width TOPICS_REGEX_WIDTH = ConfigDef.Width.LONG;
7878
private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
79-
"Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
80-
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
79+
"Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
80+
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
8181
public static final String TOPICS_REGEX_DEFAULT = "";
8282
private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
8383

@@ -238,7 +238,7 @@ public class BigQuerySinkConfig extends AbstractConfig {
238238
.define(
239239
TOPICS_CONFIG,
240240
TOPICS_TYPE,
241-
TOPICS_DEFAULT,
241+
TOPICS_DEFAULT,
242242
TOPICS_IMPORTANCE,
243243
TOPICS_DOC,
244244
TOPICS_GROUP,
@@ -248,7 +248,7 @@ public class BigQuerySinkConfig extends AbstractConfig {
248248
.define(
249249
TOPICS_REGEX_CONFIG,
250250
TOPICS_REGEX_TYPE,
251-
TOPICS_REGEX_DEFAULT,
251+
TOPICS_REGEX_DEFAULT,
252252
TOPICS_REGEX_IMPORTANCE,
253253
TOPICS_REGEX_DOC,
254254
TOPICS_REGEX_GROUP,
@@ -367,6 +367,34 @@ public class BigQuerySinkConfig extends AbstractConfig {
367367
TABLE_CREATE_DOC
368368
);
369369
}
370+
/**
371+
* Throw an exception if the passed-in properties do not constitute a valid sink.
372+
* @param props sink configuration properties
373+
*/
374+
public static void validate(Map<String, String> props) {
375+
final boolean hasTopicsConfig = hasTopicsConfig(props);
376+
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
377+
378+
if (hasTopicsConfig && hasTopicsRegexConfig) {
379+
throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG +
380+
" are mutually exclusive options, but both are set.");
381+
}
382+
383+
if (!hasTopicsConfig && !hasTopicsRegexConfig) {
384+
throw new ConfigException("Must configure one of " +
385+
TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG);
386+
}
387+
}
388+
389+
public static boolean hasTopicsConfig(Map<String, String> props) {
390+
String topicsStr = props.get(TOPICS_CONFIG);
391+
return topicsStr != null && !topicsStr.trim().isEmpty();
392+
}
393+
394+
public static boolean hasTopicsRegexConfig(Map<String, String> props) {
395+
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
396+
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
397+
}
370398

371399
@SuppressWarnings("unchecked")
372400
public static class Validator implements ConfigDef.Validator {

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
4242
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
43+
import org.apache.kafka.common.config.ConfigException;
4344

4445
import org.apache.kafka.connect.data.Schema;
4546

@@ -131,9 +132,14 @@ public void testConfig() {
131132
// Make sure that a config exception is properly translated into a SinkConfigConnectException
132133
@Test(expected = SinkConfigConnectException.class)
133134
public void testConfigException() {
134-
Map<String, String> badProperties = propertiesFactory.getProperties();
135-
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
136-
new BigQuerySinkConnector().start(badProperties);
135+
try {
136+
Map<String, String> badProperties = propertiesFactory.getProperties();
137+
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
138+
BigQuerySinkConfig.validate(badProperties);
139+
new BigQuerySinkConnector().start(badProperties);
140+
} catch (ConfigException e) {
141+
throw new SinkConfigConnectException(e);
142+
}
137143
}
138144

139145
@Test

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
4242
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
4343
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
44+
import org.apache.kafka.common.config.ConfigException;
4445

4546
import org.apache.kafka.common.record.TimestampType;
4647
import org.apache.kafka.connect.data.Schema;
@@ -490,15 +491,20 @@ public void testInterruptedException() {
490491
// Make sure that a ConfigException is properly translated into a SinkConfigConnectException
491492
@Test(expected = SinkConfigConnectException.class)
492493
public void testConfigException() {
493-
Map<String, String> badProperties = propertiesFactory.getProperties();
494-
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
495-
496-
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
497-
SchemaManager schemaManager = mock(SchemaManager.class);
498-
499-
BigQuerySinkTask testTask =
500-
new BigQuerySinkTask(mock(BigQuery.class), schemaRetriever, mock(Storage.class), schemaManager);
501-
testTask.start(badProperties);
494+
try {
495+
Map<String, String> badProperties = propertiesFactory.getProperties();
496+
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
497+
BigQuerySinkConfig.validate(badProperties);
498+
499+
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
500+
SchemaManager schemaManager = mock(SchemaManager.class);
501+
502+
BigQuerySinkTask testTask =
503+
new BigQuerySinkTask(mock(BigQuery.class), schemaRetriever, mock(Storage.class), schemaManager);
504+
testTask.start(badProperties);
505+
} catch (ConfigException e) {
506+
throw new SinkConfigConnectException(e);
507+
}
502508
}
503509

504510
@Test

0 commit comments

Comments
 (0)