Skip to content

Commit 0a21bd5

Browse files
author
Bingqin Zhou
committed
Move autoCreateTable function to BigQueryWriter.
1 parent dda43a9 commit 0a21bd5

File tree

2 files changed

+36
-25
lines changed

2 files changed

+36
-25
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
142142

143143
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
144144

145-
maybeCreateTable(record, baseTableId);
146-
147145
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
148146
if(usePartitionDecorator) {
149147

@@ -161,20 +159,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
161159
return builder.build();
162160
}
163161

164-
/**
165-
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
166-
* @param record Kafka Sink Record to be streamed into BigQuery.
167-
* @param baseTableId BaseTableId in BigQuery.
168-
*/
169-
private void maybeCreateTable(SinkRecord record, TableId baseTableId) {
170-
BigQuery bigQuery = getBigQuery();
171-
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
172-
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
173-
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
174-
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
175-
}
176-
}
177-
178162
private RowToInsert getRecordRow(SinkRecord record) {
179163
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
180164
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
@@ -284,14 +268,17 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
284268

285269
private BigQueryWriter getBigQueryWriter() {
286270
boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
271+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
287272
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
288273
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
289274
BigQuery bigQuery = getBigQuery();
290-
if (updateSchemas) {
275+
if (updateSchemas || autoCreateTables) {
291276
return new AdaptiveBigQueryWriter(bigQuery,
292277
getSchemaManager(bigQuery),
293278
retry,
294-
retryWait);
279+
retryWait,
280+
updateSchemas,
281+
autoCreateTables);
295282
} else {
296283
return new SimpleBigQueryWriter(bigQuery, retry, retryWait);
297284
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,18 @@
3737
import java.util.Map;
3838

3939
/**
40-
* A {@link BigQueryWriter} capable of updating BigQuery table schemas.
40+
* A {@link BigQueryWriter} capable of updating BigQuery table schemas and creating non-existed tables automatically.
4141
*/
4242
public class AdaptiveBigQueryWriter extends BigQueryWriter {
4343
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);
4444

4545
// The maximum number of retries we will attempt to write rows after updating a BQ table schema.
46-
private static final int AFTER_UPDATE_RETY_LIMIT = 5;
46+
private static final int AFTER_UPDATE_RETRY_LIMIT = 5;
4747

4848
private final BigQuery bigQuery;
4949
private final SchemaManager schemaManager;
50+
private final boolean updateSchemas;
51+
private final boolean autoCreateTables;
5052

5153
/**
5254
* @param bigQuery Used to send write requests to BigQuery.
@@ -57,10 +59,14 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter {
5759
public AdaptiveBigQueryWriter(BigQuery bigQuery,
5860
SchemaManager schemaManager,
5961
int retry,
60-
long retryWait) {
62+
long retryWait,
63+
boolean updateSchemas,
64+
boolean autoCreateTables) {
6165
super(retry, retryWait);
6266
this.bigQuery = bigQuery;
6367
this.schemaManager = schemaManager;
68+
this.updateSchemas = updateSchemas;
69+
this.autoCreateTables = autoCreateTables;
6470
}
6571

6672
private boolean isTableMissingSchema(BigQueryException exception) {
@@ -69,6 +75,12 @@ private boolean isTableMissingSchema(BigQueryException exception) {
6975
return exception.getReason() != null && exception.getReason().equalsIgnoreCase("invalid");
7076
}
7177

78+
private boolean isTableNotExisted(BigQueryException exception) {
79+
// If a table does not exist, it will raise a BigQueryException that the input is notFound
80+
// Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en
81+
return exception.getReason() != null && exception.getReason().equalsIgnoreCase("notFound");
82+
}
83+
7284
/**
7385
* Sends the request to BigQuery, then checks the response to see if any errors have occurred. If
7486
* any have, and all errors can be blamed upon invalid columns in the rows sent, attempts to
@@ -89,11 +101,13 @@ public Map<Long, List<BigQueryError>> performWriteRequest(
89101
// Should only perform one schema update attempt; may have to continue insert attempts due to
90102
// BigQuery schema updates taking up to two minutes to take effect
91103
if (writeResponse.hasErrors()
92-
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
104+
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSchemas) {
93105
attemptSchemaUpdate(tableId, topic);
94106
}
95107
} catch (BigQueryException exception) {
96-
if (isTableMissingSchema(exception)) {
108+
if (isTableNotExisted(exception) && autoCreateTables) {
109+
attemptTableCreate(tableId, topic);
110+
} else if (isTableMissingSchema(exception) && updateSchemas) {
97111
attemptSchemaUpdate(tableId, topic);
98112
} else {
99113
throw exception;
@@ -117,10 +131,10 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
117131
return writeResponse.getInsertErrors();
118132
}
119133
attemptCount++;
120-
if (attemptCount >= AFTER_UPDATE_RETY_LIMIT) {
134+
if (attemptCount >= AFTER_UPDATE_RETRY_LIMIT) {
121135
throw new BigQueryConnectException(
122136
"Failed to write rows after BQ schema update within "
123-
+ AFTER_UPDATE_RETY_LIMIT + " attempts for: " + tableId.getBaseTableId());
137+
+ AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
124138
}
125139
}
126140
logger.debug("table insertion completed successfully");
@@ -136,6 +150,16 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) {
136150
}
137151
}
138152

153+
private void attemptTableCreate(PartitionedTableId tableId, String topic) {
154+
try {
155+
schemaManager.createTable(tableId.getBaseTableId(), topic);
156+
logger.info("Table {} does not exist, auto-created table for topic {}", tableId.getBaseTableName(), topic);
157+
} catch (BigQueryException exception) {
158+
throw new BigQueryConnectException(
159+
"Failed to create table " + tableId.getBaseTableName(), exception);
160+
}
161+
}
162+
139163
/*
140164
* Currently, the only way to determine the cause of an insert all failure is by examining the map
141165
* object returned by the insertErrors() method of an insert all response. The only way to

0 commit comments

Comments
 (0)