Skip to content

Commit e810f79

Browse files
author
Bingqin Zhou
committed
Avoid passing an extra autoCreateTables to getRecordTable function.
1 parent c68b1e4 commit e810f79

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
131131
topicPartitionManager.resumeAll();
132132
}
133133

134-
private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateTables) {
134+
private PartitionedTableId getRecordTable(SinkRecord record) {
135135
// Dynamically update topicToBaseTableIds mapping. topicToBaseTableIds was used to be
136136
// constructed when connector starts hence new topic configuration needed connector to restart.
137137
// Dynamic update shall not require connector restart and shall compute table id in runtime.
@@ -141,7 +141,7 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT
141141

142142
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
143143

144-
maybeCreateTable(record, baseTableId, autoCreateTables);
144+
maybeCreateTable(record, baseTableId);
145145

146146
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
147147
if (useMessageTimeDatePartitioning) {
@@ -162,10 +162,10 @@ private PartitionedTableId getRecordTable(SinkRecord record, boolean autoCreateT
162162
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
163163
* @param record Kafka Sink Record to be streamed into BigQuery.
164164
* @param baseTableId BaseTableId in BigQuery.
165-
* @param autoCreateTables If this config is set to true, auto-creating the table that doesn't not exist.
166165
*/
167-
private void maybeCreateTable(SinkRecord record, TableId baseTableId, boolean autoCreateTables) {
166+
private void maybeCreateTable(SinkRecord record, TableId baseTableId) {
168167
BigQuery bigQuery = getBigQuery();
168+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
169169
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
170170
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
171171
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
@@ -202,11 +202,9 @@ public void put(Collection<SinkRecord> records) {
202202
// create tableWriters
203203
Map<PartitionedTableId, TableWriterBuilder> tableWriterBuilders = new HashMap<>();
204204

205-
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
206-
207205
for (SinkRecord record : records) {
208206
if (record.value() != null) {
209-
PartitionedTableId table = getRecordTable(record, autoCreateTables);
207+
PartitionedTableId table = getRecordTable(record);
210208
if (schemaRetriever != null) {
211209
schemaRetriever.setLastSeenSchema(table.getBaseTableId(),
212210
record.topic(),

0 commit comments

Comments
 (0)