Skip to content

Commit 8cb0857

Browse files
author
Bingqin Zhou
committed
Make changes according to comments.
1 parent 414f6a8 commit 8cb0857

File tree

3 files changed

+20
-15
lines changed

3 files changed

+20
-15
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ private void ensureExistingTables() {
113113
for (TableId tableId : topicsToTableIds.values()) {
114114
if (bigQuery.getTable(tableId) == null) {
115115
logger.warn(
116-
"You may want to enable auto table creation by setting {}=true in the properties file",
117-
config.TABLE_CREATE_CONFIG);
116+
"You may want to enable auto table creation by setting {}=true in the properties file",
117+
config.TABLE_CREATE_CONFIG);
118118
throw new BigQueryConnectException("Table '" + tableId + "' does not exist");
119119
}
120120
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,7 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT
141141

142142
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
143143

144-
BigQuery bigQuery = getBigQuery();
145-
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
146-
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
147-
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
148-
}
144+
maybeCreateTable(record, baseTableId, autoCreateTables);
149145

150146
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
151147
if (useMessageTimeDatePartitioning) {
@@ -162,6 +158,20 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT
162158
return builder.build();
163159
}
164160

161+
/**
162+
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
163+
* @param record Kafka Sink Record to be streamed into BigQuery.
164+
* @param baseTableId BaseTableId in BigQuery.
165+
* @param autoCreateTables If this config is set to true, auto-creating the table that doesn't not exist.
166+
*/
167+
private void maybeCreateTable(SinkRecord record, TableId baseTableId, boolean autoCreateTables) {
168+
BigQuery bigQuery = getBigQuery();
169+
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
170+
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
171+
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
172+
}
173+
}
174+
165175
private RowToInsert getRecordRow(SinkRecord record) {
166176
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
167177
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -625,18 +625,13 @@ private void verifyBucketSpecified() throws ConfigException {
625625
}
626626

627627
private void checkAutoCreateTables() {
628-
Class<?> schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG);
629628

629+
Class<?> schemaRetriever = getClass(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG);
630630
boolean autoCreateTables = getBoolean(TABLE_CREATE_CONFIG);
631+
631632
if (autoCreateTables && schemaRetriever == null) {
632633
throw new ConfigException(
633-
"Cannot specify automatic table creation without a schema retriever"
634-
);
635-
}
636-
637-
if (schemaRetriever == null) {
638-
logger.warn(
639-
"No schema retriever class provided; auto table creation is impossible"
634+
"Cannot specify automatic table creation without a schema retriever"
640635
);
641636
}
642637
}

0 commit comments

Comments
 (0)