Skip to content

Commit 5e2dd7c

Browse files
authored
Merge pull request #261 from gkstechie/master
topics.regex support in BigQuery Sink Connector
2 parents 4fb38a8 + 49344e2 commit 5e2dd7c

File tree

3 files changed

+76
-12
lines changed

3 files changed

+76
-12
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,21 @@ public class BigQuerySinkConfig extends AbstractConfig {
6666
private static final ConfigDef.Width TOPICS_WIDTH = ConfigDef.Width.LONG;
6767
private static final String TOPICS_DOC =
6868
"List of topics to consume, separated by commas";
69+
public static final String TOPICS_DEFAULT = "";
6970
private static final String TOPICS_DISPLAY = "Topics";
7071

72+
public static final String TOPICS_REGEX_CONFIG = "topics.regex";
73+
private static final ConfigDef.Type TOPICS_REGEX_TYPE = ConfigDef.Type.STRING;
74+
private static final ConfigDef.Importance TOPICS_REGEX_IMPORTANCE = ConfigDef.Importance.HIGH;
75+
private static final String TOPICS_REGEX_GROUP = "Common";
76+
private static final int TOPICS_REGEX_ORDER_IN_GROUP = 4;
77+
private static final ConfigDef.Width TOPICS_REGEX_WIDTH = ConfigDef.Width.LONG;
78+
private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
79+
"Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
80+
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
81+
public static final String TOPICS_REGEX_DEFAULT = "";
82+
private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
83+
7184
public static final String ENABLE_BATCH_CONFIG = "enableBatchLoad";
7285
private static final ConfigDef.Type ENABLE_BATCH_TYPE = ConfigDef.Type.LIST;
7386
private static final List<String> ENABLE_BATCH_DEFAULT = Collections.emptyList();
@@ -225,12 +238,23 @@ public class BigQuerySinkConfig extends AbstractConfig {
225238
.define(
226239
TOPICS_CONFIG,
227240
TOPICS_TYPE,
241+
TOPICS_DEFAULT,
228242
TOPICS_IMPORTANCE,
229243
TOPICS_DOC,
230244
TOPICS_GROUP,
231245
TOPICS_ORDER_IN_GROUP,
232246
TOPICS_WIDTH,
233247
TOPICS_DISPLAY)
248+
.define(
249+
TOPICS_REGEX_CONFIG,
250+
TOPICS_REGEX_TYPE,
251+
TOPICS_REGEX_DEFAULT,
252+
TOPICS_REGEX_IMPORTANCE,
253+
TOPICS_REGEX_DOC,
254+
TOPICS_REGEX_GROUP,
255+
TOPICS_REGEX_ORDER_IN_GROUP,
256+
TOPICS_REGEX_WIDTH,
257+
TOPICS_REGEX_DISPLAY)
234258
.define(
235259
ENABLE_BATCH_CONFIG,
236260
ENABLE_BATCH_TYPE,
@@ -343,6 +367,34 @@ public class BigQuerySinkConfig extends AbstractConfig {
343367
TABLE_CREATE_DOC
344368
);
345369
}
370+
/**
371+
* Throw an exception if the passed-in properties do not constitute a valid sink.
372+
* @param props sink configuration properties
373+
*/
374+
public static void validate(Map<String, String> props) {
375+
final boolean hasTopicsConfig = hasTopicsConfig(props);
376+
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
377+
378+
if (hasTopicsConfig && hasTopicsRegexConfig) {
379+
throw new ConfigException(TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG +
380+
" are mutually exclusive options, but both are set.");
381+
}
382+
383+
if (!hasTopicsConfig && !hasTopicsRegexConfig) {
384+
throw new ConfigException("Must configure one of " +
385+
TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG);
386+
}
387+
}
388+
389+
public static boolean hasTopicsConfig(Map<String, String> props) {
390+
String topicsStr = props.get(TOPICS_CONFIG);
391+
return topicsStr != null && !topicsStr.trim().isEmpty();
392+
}
393+
394+
public static boolean hasTopicsRegexConfig(Map<String, String> props) {
395+
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
396+
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
397+
}
346398

347399
@SuppressWarnings("unchecked")
348400
public static class Validator implements ConfigDef.Validator {

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
4242
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
43+
import org.apache.kafka.common.config.ConfigException;
4344

4445
import org.apache.kafka.connect.data.Schema;
4546

@@ -131,9 +132,14 @@ public void testConfig() {
131132
// Make sure that a config exception is properly translated into a SinkConfigConnectException
132133
@Test(expected = SinkConfigConnectException.class)
133134
public void testConfigException() {
134-
Map<String, String> badProperties = propertiesFactory.getProperties();
135-
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
136-
new BigQuerySinkConnector().start(badProperties);
135+
try {
136+
Map<String, String> badProperties = propertiesFactory.getProperties();
137+
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
138+
BigQuerySinkConfig.validate(badProperties);
139+
new BigQuerySinkConnector().start(badProperties);
140+
} catch (ConfigException e) {
141+
throw new SinkConfigConnectException(e);
142+
}
137143
}
138144

139145
@Test

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
4242
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
4343
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
44+
import org.apache.kafka.common.config.ConfigException;
4445

4546
import org.apache.kafka.common.record.TimestampType;
4647
import org.apache.kafka.connect.data.Schema;
@@ -490,15 +491,20 @@ public void testInterruptedException() {
490491
// Make sure that a ConfigException is properly translated into a SinkConfigConnectException
491492
@Test(expected = SinkConfigConnectException.class)
492493
public void testConfigException() {
493-
Map<String, String> badProperties = propertiesFactory.getProperties();
494-
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
495-
496-
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
497-
SchemaManager schemaManager = mock(SchemaManager.class);
498-
499-
BigQuerySinkTask testTask =
500-
new BigQuerySinkTask(mock(BigQuery.class), schemaRetriever, mock(Storage.class), schemaManager);
501-
testTask.start(badProperties);
494+
try {
495+
Map<String, String> badProperties = propertiesFactory.getProperties();
496+
badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG);
497+
BigQuerySinkConfig.validate(badProperties);
498+
499+
SchemaRetriever schemaRetriever = mock(SchemaRetriever.class);
500+
SchemaManager schemaManager = mock(SchemaManager.class);
501+
502+
BigQuerySinkTask testTask =
503+
new BigQuerySinkTask(mock(BigQuery.class), schemaRetriever, mock(Storage.class), schemaManager);
504+
testTask.start(badProperties);
505+
} catch (ConfigException e) {
506+
throw new SinkConfigConnectException(e);
507+
}
502508
}
503509

504510
@Test

0 commit comments

Comments
 (0)