Skip to content

Commit 414f6a8

Browse files
author
Bingqin Zhou
committed
Make changes according to comments.
1 parent d4201c0 commit 414f6a8

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private void ensureExistingTables() {
111111
BigQuery bigQuery = getBigQuery();
112112
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
113113
for (TableId tableId : topicsToTableIds.values()) {
114-
if (!config.getBoolean(config.TABLE_CREATE_CONFIG) && bigQuery.getTable(tableId) == null) {
114+
if (bigQuery.getTable(tableId) == null) {
115115
logger.warn(
116116
"You may want to enable auto table creation by setting {}=true in the properties file",
117117
config.TABLE_CREATE_CONFIG);
@@ -133,7 +133,9 @@ public void start(Map<String, String> properties) {
133133
);
134134
}
135135

136-
ensureExistingTables();
136+
if (!config.getBoolean(config.TABLE_CREATE_CONFIG)) {
137+
ensureExistingTables();
138+
}
137139
}
138140

139141
@Override

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
131131
topicPartitionManager.resumeAll();
132132
}
133133

134-
private PartitionedTableId getRecordTable(boolean autoCreateTables, SinkRecord record) {
134+
private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateTables) {
135135
// Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be
136136
// constructed when connector starts hence new topic configuration needed connector to restart.
137137
// Dynamic update shall not require connector restart and shall compute table id in runtime.
@@ -196,7 +196,7 @@ public void put(Collection<SinkRecord> records) {
196196

197197
for (SinkRecord record : records) {
198198
if (record.value() != null) {
199-
PartitionedTableId table = getRecordTable(autoCreateTables, record);
199+
PartitionedTableId table = getRecordTable(record, autoCreateTables);
200200
if (schemaRetriever != null) {
201201
schemaRetriever.setLastSeenSchema(table.getBaseTableId(),
202202
record.topic(),

0 commit comments

Comments
 (0)