Skip to content

Commit 6517a06

Browse files
author
Bingqin Zhou
committed
Support non existent topics in KCBQ.
1 parent d797ae6 commit 6517a06

File tree

8 files changed

+64
-184
lines changed

8 files changed

+64
-184
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
2525

2626
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
27-
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig;
2827

2928
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
3029

@@ -76,7 +75,7 @@ public BigQuerySinkConnector() {
7675
this.testSchemaManager = schemaManager;
7776
}
7877

79-
private BigQuerySinkConnectorConfig config;
78+
private BigQuerySinkConfig config;
8079
private Map<String, String> configProperties;
8180

8281
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConnector.class);
@@ -108,50 +107,25 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
108107
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
109108
}
110109

111-
private void ensureExistingTables(
112-
BigQuery bigQuery,
113-
SchemaManager schemaManager,
114-
Map<String, TableId> topicsToTableIds) {
115-
for (Map.Entry<String, TableId> topicToTableId : topicsToTableIds.entrySet()) {
116-
String topic = topicToTableId.getKey();
117-
TableId tableId = topicToTableId.getValue();
118-
if (bigQuery.getTable(tableId) == null) {
119-
logger.info("Table {} does not exist; attempting to create", tableId);
120-
schemaManager.createTable(tableId, topic);
121-
}
122-
}
123-
}
124-
125-
private void ensureExistingTables(
126-
BigQuery bigQuery,
127-
Map<String, TableId> topicsToTableIds) {
110+
private void ensureExistingTables() {
111+
BigQuery bigQuery = getBigQuery();
112+
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
128113
for (TableId tableId : topicsToTableIds.values()) {
129-
if (bigQuery.getTable(tableId) == null) {
114+
if (!config.getBoolean(config.TABLE_CREATE_CONFIG) && bigQuery.getTable(tableId) == null) {
130115
logger.warn(
131-
"You may want to enable auto table creation by setting {}=true in the properties file",
132-
config.TABLE_CREATE_CONFIG);
116+
"You may want to enable auto table creation by setting {}=true in the properties file",
117+
config.TABLE_CREATE_CONFIG);
133118
throw new BigQueryConnectException("Table '" + tableId + "' does not exist");
134119
}
135120
}
136121
}
137122

138-
private void ensureExistingTables() {
139-
BigQuery bigQuery = getBigQuery();
140-
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
141-
if (config.getBoolean(config.TABLE_CREATE_CONFIG)) {
142-
SchemaManager schemaManager = getSchemaManager(bigQuery);
143-
ensureExistingTables(bigQuery, schemaManager, topicsToTableIds);
144-
} else {
145-
ensureExistingTables(bigQuery, topicsToTableIds);
146-
}
147-
}
148-
149123
@Override
150124
public void start(Map<String, String> properties) {
151125
logger.trace("connector.start()");
152126
try {
153127
configProperties = properties;
154-
config = new BigQuerySinkConnectorConfig(properties);
128+
config = new BigQuerySinkConfig(properties);
155129
} catch (ConfigException err) {
156130
throw new SinkConfigConnectException(
157131
"Couldn't start BigQuerySinkConnector due to configuration error",

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
127127
topicPartitionManager.resumeAll();
128128
}
129129

130-
private PartitionedTableId getRecordTable(SinkRecord record) {
130+
private PartitionedTableId getRecordTable(boolean autoCreateTables, SinkRecord record) {
131131
// Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be
132132
// constructed when connector starts hence new topic configuration needed connector to restart.
133133
// Dynamic update shall not require connector restart and shall compute table id in runtime.
@@ -137,6 +137,12 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
137137

138138
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
139139

140+
BigQuery bigQuery = getBigQuery();
141+
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
142+
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
143+
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
144+
}
145+
140146
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
141147
if (useMessageTimeDatePartitioning) {
142148
if (record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) {
@@ -182,9 +188,11 @@ public void put(Collection<SinkRecord> records) {
182188
// create tableWriters
183189
Map<PartitionedTableId, TableWriterBuilder> tableWriterBuilders = new HashMap<>();
184190

191+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
192+
185193
for (SinkRecord record : records) {
186194
if (record.value() != null) {
187-
PartitionedTableId table = getRecordTable(record);
195+
PartitionedTableId table = getRecordTable(autoCreateTables, record);
188196
if (schemaRetriever != null) {
189197
schemaRetriever.setLastSeenSchema(table.getBaseTableId(),
190198
record.topic(),

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.kafka.common.config.ConfigException;
3333

3434
import org.apache.kafka.connect.sink.SinkConnector;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3537

3638
import java.lang.reflect.Constructor;
3739
import java.lang.reflect.InvocationTargetException;
@@ -52,6 +54,7 @@
5254
public class BigQuerySinkConfig extends AbstractConfig {
5355
private static final ConfigDef config;
5456
private static final Validator validator = new Validator();
57+
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkConfig.class);
5558

5659
// Values taken from https://github.com/apache/kafka/blob/1.1.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java#L33
5760
public static final String TOPICS_CONFIG = SinkConnector.TOPICS_CONFIG;
@@ -209,6 +212,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
209212
"If true, no fields in any produced BigQuery schema will be REQUIRED. All "
210213
+ "non-nullable avro fields will be translated as NULLABLE (or REPEATED, if arrays).";
211214

215+
public static final String TABLE_CREATE_CONFIG = "autoCreateTables";
216+
private static final ConfigDef.Type TABLE_CREATE_TYPE = ConfigDef.Type.BOOLEAN;
217+
public static final boolean TABLE_CREATE_DEFAULT = false;
218+
private static final ConfigDef.Importance TABLE_CREATE_IMPORTANCE = ConfigDef.Importance.HIGH;
219+
private static final String TABLE_CREATE_DOC =
220+
"Automatically create BigQuery tables if they don't already exist";
221+
212222
static {
213223
config = new ConfigDef()
214224
.define(
@@ -324,7 +334,13 @@ public class BigQuerySinkConfig extends AbstractConfig {
324334
CONVERT_DOUBLE_SPECIAL_VALUES_DEFAULT,
325335
CONVERT_DOUBLE_SPECIAL_VALUES_IMPORTANCE,
326336
CONVERT_DOUBLE_SPECIAL_VALUES_DOC
327-
);
337+
).define(
338+
TABLE_CREATE_CONFIG,
339+
TABLE_CREATE_TYPE,
340+
TABLE_CREATE_DEFAULT,
341+
TABLE_CREATE_IMPORTANCE,
342+
TABLE_CREATE_DOC
343+
);
328344
}
329345

330346
@SuppressWarnings("unchecked")
@@ -608,6 +624,23 @@ private void verifyBucketSpecified() throws ConfigException {
608624
}
609625
}
610626

627+
private void checkAutoCreateTables() {
628+
Class<?> schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG);
629+
630+
boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG);
631+
if (autoCreateTables && schemaRetriever == null) {
632+
throw new ConfigException(
633+
"Cannot specify automatic table creation without a schema retriever"
634+
);
635+
}
636+
637+
if (schemaRetriever == null) {
638+
logger.warn(
639+
"No schema retriever class provided; auto table creation is impossible"
640+
);
641+
}
642+
}
643+
611644
/**
612645
* Return the ConfigDef object used to define this config's fields.
613646
*
@@ -625,6 +658,7 @@ protected BigQuerySinkConfig(ConfigDef config, Map<String, String> properties) {
625658
public BigQuerySinkConfig(Map<String, String> properties) {
626659
super(config, properties);
627660
verifyBucketSpecified();
661+
checkAutoCreateTables();
628662
}
629663

630664
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfig.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnectorTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
3838

3939
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
40-
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig;
4140

4241
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
4342
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
@@ -86,7 +85,7 @@ public void testAutoCreateTables() {
8685
final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table");
8786

8887
Map<String, String> properties = propertiesFactory.getProperties();
89-
properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "true");
88+
properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true");
9089
properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, MockSchemaRetriever.class.getName());
9190
properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true");
9291
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset));
@@ -118,7 +117,7 @@ public void testNonAutoCreateTables() {
118117
final String[] tables = new String[] { "topic_one", "topicTwo", "TOPIC_THREE", "topic_four" };
119118

120119
Map<String, String> properties = propertiesFactory.getProperties();
121-
properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false");
120+
properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
122121
properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true");
123122
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset));
124123
properties.put(BigQuerySinkConfig.TOPICS_CONFIG, String.join(",", topics));
@@ -150,7 +149,7 @@ public void testNonAutoCreateTablesFailure() {
150149
final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table");
151150

152151
Map<String, String> properties = propertiesFactory.getProperties();
153-
properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false");
152+
properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
154153
properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true");
155154
properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset));
156155
properties.put(
@@ -216,7 +215,7 @@ public void testTaskConfigs() {
216215

217216
@Test
218217
public void testConfig() {
219-
assertEquals(BigQuerySinkConnectorConfig.getConfig(), new BigQuerySinkConnector().config());
218+
assertEquals(BigQuerySinkConfig.getConfig(), new BigQuerySinkConnector().config());
220219
}
221220

222221
// Make sure that a config exception is properly translated into a SinkConfigConnectException

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkConnectorPropertiesFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919

2020

21-
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConnectorConfig;
21+
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
2222

2323
import java.util.Map;
2424

@@ -27,7 +27,7 @@ public class SinkConnectorPropertiesFactory extends SinkPropertiesFactory {
2727
public Map<String, String> getProperties() {
2828
Map<String, String> properties = super.getProperties();
2929

30-
properties.put(BigQuerySinkConnectorConfig.TABLE_CREATE_CONFIG, "false");
30+
properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "false");
3131
return properties;
3232
}
3333

@@ -37,7 +37,7 @@ public Map<String, String> getProperties() {
3737
*
3838
* @param config The config object to test
3939
*/
40-
public void testProperties(BigQuerySinkConnectorConfig config) {
40+
public void testProperties(BigQuerySinkConfig config) {
4141
super.testProperties(config);
4242

4343
config.getBoolean(config.TABLE_CREATE_CONFIG);

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConnectorConfigTest.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)