Skip to content

Commit 8709d04

Browse files
author
Bingqin Zhou
committed
Add sleep block in performWriteRequest to wait for table creation to take effect.
1 parent a2538ed commit 8709d04

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter {
4444

4545
// The maximum number of retries we will attempt to write rows after updating a BQ table schema.
4646
private static final int AFTER_UPDATE_RETRY_LIMIT = 5;
47+
// Wait for about 30s between each retry since both creating table and updating schema take up to 2 minutes to take effect.
48+
private static final int RETRY_WAIT_TIME = 30000;
4749

4850
private final BigQuery bigQuery;
4951
private final SchemaManager schemaManager;
@@ -105,7 +107,8 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch
105107
attemptSchemaUpdate(tableId, topic);
106108
}
107109
} catch (BigQueryException exception) {
108-
if (isTableNotExisted(exception) && autoCreateTables) {
110+
// Should only perform one table creation attempt.
111+
if (isTableNotExisted(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) {
109112
attemptTableCreate(tableId, topic);
110113
} else if (isTableMissingSchema(exception) && updateSchemas) {
111114
attemptSchemaUpdate(tableId, topic);
@@ -136,6 +139,11 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch
136139
"Failed to write rows after BQ schema update within "
137140
+ AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
138141
}
142+
try {
143+
Thread.sleep(RETRY_WAIT_TIME);
144+
} catch (InterruptedException e) {
145+
// no-op, we want to keep retrying the insert
146+
}
139147
}
140148
logger.debug("table insertion completed successfully");
141149
return new HashMap<>();

kcbq-connector/test/docker/connect/connect-docker.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ connect-standalone \
2020
/etc/kafka-connect-bigquery/standalone.properties \
2121
/etc/kafka-connect-bigquery/connector.properties &
2222

23-
sleep 60
23+
sleep 180
2424
kill $!

0 commit comments

Comments
 (0)