Skip to content

Commit 06aa5ba

Browse files
author
Bingqin Zhou
authored
Merge pull request #248 from wepay/DI_3169
Auto-create tables when rows insertion fails.
2 parents dda43a9 + ca32148 commit 06aa5ba

File tree

9 files changed

+263
-146
lines changed

9 files changed

+263
-146
lines changed

build.gradle

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ project.ext {
2222
ioConfluentVersion = '5.3.1'
2323
junitVersion = '4.12'
2424
kafkaVersion = '2.3.0'
25-
mockitoVersion = '1.10.19'
25+
mockitoVersion = '3.2.4'
2626
slf4jVersion = '1.6.1'
2727
}
2828

@@ -215,7 +215,8 @@ project(':kcbq-connector') {
215215

216216
testCompile (
217217
"junit:junit:$junitVersion",
218-
"org.mockito:mockito-core:$mockitoVersion"
218+
"org.mockito:mockito-core:$mockitoVersion",
219+
"org.mockito:mockito-inline:$mockitoVersion"
219220
)
220221
}
221222

@@ -334,7 +335,8 @@ project('kcbq-confluent') {
334335

335336
testCompile (
336337
"junit:junit:$junitVersion",
337-
"org.mockito:mockito-core:$mockitoVersion"
338+
"org.mockito:mockito-core:$mockitoVersion",
339+
"org.mockito:mockito-inline:$mockitoVersion"
338340
)
339341
}
340342

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
142142

143143
TableId baseTableId = topicsToBaseTableIds.get(record.topic());
144144

145-
maybeCreateTable(record, baseTableId);
146-
147145
PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
148146
if(usePartitionDecorator) {
149147

@@ -161,20 +159,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
161159
return builder.build();
162160
}
163161

164-
/**
165-
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
166-
* @param record Kafka Sink Record to be streamed into BigQuery.
167-
* @param baseTableId BaseTableId in BigQuery.
168-
*/
169-
private void maybeCreateTable(SinkRecord record, TableId baseTableId) {
170-
BigQuery bigQuery = getBigQuery();
171-
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
172-
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
173-
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
174-
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
175-
}
176-
}
177-
178162
private RowToInsert getRecordRow(SinkRecord record) {
179163
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
180164
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
@@ -217,7 +201,8 @@ public void put(Collection<SinkRecord> records) {
217201
if (!tableWriterBuilders.containsKey(table)) {
218202
TableWriterBuilder tableWriterBuilder;
219203
if (config.getList(config.ENABLE_BATCH_CONFIG).contains(record.topic())) {
220-
String gcsBlobName = record.topic() + "_" + uuid + "_" + Instant.now().toEpochMilli();
204+
String topic = record.topic();
205+
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
221206
String gcsFolderName = config.getString(config.GCS_FOLDER_NAME_CONFIG);
222207
if (gcsFolderName != null && !"".equals(gcsFolderName)) {
223208
gcsBlobName = gcsFolderName + "/" + gcsBlobName;
@@ -227,6 +212,7 @@ public void put(Collection<SinkRecord> records) {
227212
table.getBaseTableId(),
228213
config.getString(config.GCS_BUCKET_NAME_CONFIG),
229214
gcsBlobName,
215+
topic,
230216
recordConverter);
231217
} else {
232218
tableWriterBuilder =
@@ -283,15 +269,18 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
283269
}
284270

285271
private BigQueryWriter getBigQueryWriter() {
286-
boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
272+
boolean autoUpdateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
273+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
287274
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
288275
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
289276
BigQuery bigQuery = getBigQuery();
290-
if (updateSchemas) {
277+
if (autoUpdateSchemas || autoCreateTables) {
291278
return new AdaptiveBigQueryWriter(bigQuery,
292279
getSchemaManager(bigQuery),
293280
retry,
294-
retryWait);
281+
retryWait,
282+
autoUpdateSchemas,
283+
autoCreateTables);
295284
} else {
296285
return new SimpleBigQueryWriter(bigQuery, retry, retryWait);
297286
}
@@ -312,10 +301,13 @@ private GCSToBQWriter getGcsWriter() {
312301
BigQuery bigQuery = getBigQuery();
313302
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
314303
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
304+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
315305
return new GCSToBQWriter(getGcs(),
316306
bigQuery,
307+
getSchemaManager(bigQuery),
317308
retry,
318-
retryWait);
309+
retryWait,
310+
autoCreateTables);
319311
}
320312

321313
@Override

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Map;
35-
import java.util.concurrent.atomic.AtomicInteger;
3635

3736
/**
3837
* Batch Table Writer that uploads records to GCS as a blob
@@ -45,6 +44,7 @@ public class GCSBatchTableWriter implements Runnable {
4544

4645
private final String bucketName;
4746
private final String blobName;
47+
private final String topic;
4848

4949
private final List<RowToInsert> rows;
5050
private final GCSToBQWriter writer;
@@ -56,15 +56,18 @@ public class GCSBatchTableWriter implements Runnable {
5656
* @param bucketName the name of the GCS bucket where the blob should be uploaded
5757
* @param baseBlobName the base name of the blob in which the serialized rows should be uploaded.
5858
* The full name is [baseBlobName]_[writerId]_
59+
* @param topic Kafka record topic
5960
*/
6061
private GCSBatchTableWriter(List<RowToInsert> rows,
6162
GCSToBQWriter writer,
6263
TableId tableId,
6364
String bucketName,
64-
String baseBlobName) {
65+
String baseBlobName,
66+
String topic) {
6567
this.tableId = tableId;
6668
this.bucketName = bucketName;
6769
this.blobName = baseBlobName;
70+
this.topic = topic;
6871

6972
this.rows = rows;
7073
this.writer = writer;
@@ -73,7 +76,7 @@ private GCSBatchTableWriter(List<RowToInsert> rows,
7376
@Override
7477
public void run() {
7578
try {
76-
writer.writeRows(rows, tableId, bucketName, blobName);
79+
writer.writeRows(rows, tableId, bucketName, blobName, topic);
7780
} catch (ConnectException ex) {
7881
throw new ConnectException("Failed to write rows to GCS", ex);
7982
} catch (InterruptedException ex) {
@@ -87,6 +90,7 @@ public void run() {
8790
public static class Builder implements TableWriterBuilder {
8891
private final String bucketName;
8992
private String blobName;
93+
private String topic;
9094

9195
private final TableId tableId;
9296

@@ -101,16 +105,19 @@ public static class Builder implements TableWriterBuilder {
101105
* @param tableId The bigquery table to be written to.
102106
* @param gcsBucketName The GCS bucket to write to.
103107
* @param gcsBlobName The name of the GCS blob to write.
108+
* @param topic Kafka record topic
104109
* @param recordConverter the {@link RecordConverter} to use.
105110
*/
106111
public Builder(GCSToBQWriter writer,
107112
TableId tableId,
108113
String gcsBucketName,
109114
String gcsBlobName,
115+
String topic,
110116
RecordConverter<Map<String, Object>> recordConverter) {
111117

112118
this.bucketName = gcsBucketName;
113119
this.blobName = gcsBlobName;
120+
this.topic = topic;
114121

115122
this.tableId = tableId;
116123

@@ -133,7 +140,7 @@ public void addRow(RowToInsert rowToInsert) {
133140
}
134141

135142
public GCSBatchTableWriter build() {
136-
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName);
143+
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic);
137144
}
138145
}
139146
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import com.google.cloud.bigquery.BigQuery;
22+
import com.google.cloud.bigquery.TableId;
2223
import com.google.cloud.bigquery.BigQueryError;
2324
import com.google.cloud.bigquery.BigQueryException;
2425
import com.google.cloud.bigquery.InsertAllRequest;
@@ -37,16 +38,20 @@
3738
import java.util.Map;
3839

3940
/**
40-
* A {@link BigQueryWriter} capable of updating BigQuery table schemas.
41+
* A {@link BigQueryWriter} capable of updating BigQuery table schemas and creating non-existed tables automatically.
4142
*/
4243
public class AdaptiveBigQueryWriter extends BigQueryWriter {
4344
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);
4445

45-
// The maximum number of retries we will attempt to write rows after updating a BQ table schema.
46-
private static final int AFTER_UPDATE_RETY_LIMIT = 5;
46+
// The maximum number of retries we will attempt to write rows after creating a table or updating a BQ table schema.
47+
private static final int RETRY_LIMIT = 5;
48+
// Wait for about 30s between each retry since both creating table and updating schema take up to 2~3 minutes to take effect.
49+
private static final int RETRY_WAIT_TIME = 30000;
4750

4851
private final BigQuery bigQuery;
4952
private final SchemaManager schemaManager;
53+
private final boolean autoUpdateSchemas;
54+
private final boolean autoCreateTables;
5055

5156
/**
5257
* @param bigQuery Used to send write requests to BigQuery.
@@ -57,10 +62,14 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter {
5762
public AdaptiveBigQueryWriter(BigQuery bigQuery,
5863
SchemaManager schemaManager,
5964
int retry,
60-
long retryWait) {
65+
long retryWait,
66+
boolean autoUpdateSchemas,
67+
boolean autoCreateTables) {
6168
super(retry, retryWait);
6269
this.bigQuery = bigQuery;
6370
this.schemaManager = schemaManager;
71+
this.autoUpdateSchemas = autoUpdateSchemas;
72+
this.autoCreateTables = autoCreateTables;
6473
}
6574

6675
private boolean isTableMissingSchema(BigQueryException exception) {
@@ -69,6 +78,12 @@ private boolean isTableMissingSchema(BigQueryException exception) {
6978
return exception.getReason() != null && exception.getReason().equalsIgnoreCase("invalid");
7079
}
7180

81+
private boolean isTableNotExistedException(BigQueryException exception) {
82+
// If a table does not exist, it will raise a BigQueryException that the input is notFound
83+
// Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en
84+
return exception.getCode() == 404;
85+
}
86+
7287
/**
7388
* Sends the request to BigQuery, then checks the response to see if any errors have occurred. If
7489
* any have, and all errors can be blamed upon invalid columns in the rows sent, attempts to
@@ -86,21 +101,24 @@ public Map<Long, List<BigQueryError>> performWriteRequest(
86101
try {
87102
request = createInsertAllRequest(tableId, rows);
88103
writeResponse = bigQuery.insertAll(request);
89-
// Should only perform one schema update attempt; may have to continue insert attempts due to
90-
// BigQuery schema updates taking up to two minutes to take effect
104+
// Should only perform one schema update attempt.
91105
if (writeResponse.hasErrors()
92-
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
106+
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) {
93107
attemptSchemaUpdate(tableId, topic);
94108
}
95109
} catch (BigQueryException exception) {
96-
if (isTableMissingSchema(exception)) {
110+
// Should only perform one table creation attempt.
111+
if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) {
112+
attemptTableCreate(tableId.getBaseTableId(), topic);
113+
} else if (isTableMissingSchema(exception) && autoUpdateSchemas) {
97114
attemptSchemaUpdate(tableId, topic);
98115
} else {
99116
throw exception;
100117
}
101118
}
102119

103-
// Schema update might be delayed, so multiple insertion attempts may be necessary
120+
// Creating tables or updating table schemas in BigQuery takes up to 2~3 minutes to take affect,
121+
// so multiple insertion attempts may be necessary.
104122
int attemptCount = 0;
105123
while (writeResponse == null || writeResponse.hasErrors()) {
106124
logger.trace("insertion failed");
@@ -117,10 +135,15 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
117135
return writeResponse.getInsertErrors();
118136
}
119137
attemptCount++;
120-
if (attemptCount >= AFTER_UPDATE_RETY_LIMIT) {
138+
if (attemptCount >= RETRY_LIMIT) {
121139
throw new BigQueryConnectException(
122140
"Failed to write rows after BQ schema update within "
123-
+ AFTER_UPDATE_RETY_LIMIT + " attempts for: " + tableId.getBaseTableId());
141+
+ RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
142+
}
143+
try {
144+
Thread.sleep(RETRY_WAIT_TIME);
145+
} catch (InterruptedException e) {
146+
// no-op, we want to keep retrying the insert
124147
}
125148
}
126149
logger.debug("table insertion completed successfully");
@@ -136,6 +159,16 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) {
136159
}
137160
}
138161

162+
private void attemptTableCreate(TableId tableId, String topic) {
163+
try {
164+
schemaManager.createTable(tableId, topic);
165+
logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic);
166+
} catch (BigQueryException exception) {
167+
throw new BigQueryConnectException(
168+
"Failed to create table " + tableId, exception);
169+
}
170+
}
171+
139172
/*
140173
* Currently, the only way to determine the cause of an insert all failure is by examining the map
141174
* object returned by the insertErrors() method of an insert all response. The only way to

0 commit comments

Comments
 (0)