Skip to content

Commit 5a8ce72

Browse files
author
Bingqin Zhou
committed
Change updateSchemas to be autoUpdateSchemas.
1 parent c612f69 commit 5a8ce72

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,17 +269,17 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
269269
}
270270

271271
private BigQueryWriter getBigQueryWriter() {
272-
boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
272+
boolean autoUpdateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
273273
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
274274
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
275275
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
276276
BigQuery bigQuery = getBigQuery();
277-
if (updateSchemas || autoCreateTables) {
277+
if (autoUpdateSchemas || autoCreateTables) {
278278
return new AdaptiveBigQueryWriter(bigQuery,
279279
getSchemaManager(bigQuery),
280280
retry,
281281
retryWait,
282-
updateSchemas,
282+
autoUpdateSchemas,
283283
autoCreateTables);
284284
} else {
285285
return new SimpleBigQueryWriter(bigQuery, retry, retryWait);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter {
5050

5151
private final BigQuery bigQuery;
5252
private final SchemaManager schemaManager;
53-
private final boolean updateSchemas;
53+
private final boolean autoUpdateSchemas;
5454
private final boolean autoCreateTables;
5555

5656
/**
@@ -63,12 +63,12 @@ public AdaptiveBigQueryWriter(BigQuery bigQuery,
6363
SchemaManager schemaManager,
6464
int retry,
6565
long retryWait,
66-
boolean updateSchemas,
66+
boolean autoUpdateSchemas,
6767
boolean autoCreateTables) {
6868
super(retry, retryWait);
6969
this.bigQuery = bigQuery;
7070
this.schemaManager = schemaManager;
71-
this.updateSchemas = updateSchemas;
71+
this.autoUpdateSchemas = autoUpdateSchemas;
7272
this.autoCreateTables = autoCreateTables;
7373
}
7474

@@ -104,14 +104,14 @@ public Map<Long, List<BigQueryError>> performWriteRequest(
104104
// Should only perform one schema update attempt; may have to continue insert attempts due to
105105
// BigQuery schema updates taking up to two minutes to take effect
106106
if (writeResponse.hasErrors()
107-
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSchemas) {
107+
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) {
108108
attemptSchemaUpdate(tableId, topic);
109109
}
110110
} catch (BigQueryException exception) {
111111
// Should only perform one table creation attempt.
112112
if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) {
113113
attemptTableCreate(tableId.getBaseTableId(), topic);
114-
} else if (isTableMissingSchema(exception) && updateSchemas) {
114+
} else if (isTableMissingSchema(exception) && autoUpdateSchemas) {
115115
attemptSchemaUpdate(tableId, topic);
116116
} else {
117117
throw exception;

0 commit comments

Comments
 (0)