Skip to content

Commit 9798333

Browse files
author
Bingqin Zhou
committed
Clean up comments and add missed doc.
1 parent 88f8c63 commit 9798333

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Map;
35-
import java.util.concurrent.atomic.AtomicInteger;
3635

3736
/**
3837
* Batch Table Writer that uploads records to GCS as a blob
@@ -57,6 +56,7 @@ public class GCSBatchTableWriter implements Runnable {
5756
* @param bucketName the name of the GCS bucket where the blob should be uploaded
5857
* @param baseBlobName the base name of the blob in which the serialized rows should be uploaded.
5958
* The full name is [baseBlobName]_[writerId]_
59+
* @param topic Kafka record topic
6060
*/
6161
private GCSBatchTableWriter(List<RowToInsert> rows,
6262
GCSToBQWriter writer,
@@ -105,6 +105,7 @@ public static class Builder implements TableWriterBuilder {
105105
* @param tableId The bigquery table to be written to.
106106
* @param gcsBucketName The GCS bucket to write to.
107107
* @param gcsBlobName The name of the GCS blob to write.
108+
* @param topic Kafka record topic
108109
* @param recordConverter the {@link RecordConverter} to use.
109110
*/
110111
public Builder(GCSToBQWriter writer,

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
public class AdaptiveBigQueryWriter extends BigQueryWriter {
4444
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);
4545

46-
// The maximum number of retries we will attempt to write rows after updating a BQ table schema.
47-
private static final int AFTER_UPDATE_RETRY_LIMIT = 5;
48-
// Wait for about 30s between each retry since both creating table and updating schema take up to 2 minutes to take effect.
46+
// The maximum number of retries we will attempt to write rows after creating a table or updating a BQ table schema.
47+
private static final int RETRY_LIMIT = 5;
48+
// Wait for about 30s between each retry since both creating table and updating schema take up to 2~3 minutes to take effect.
4949
private static final int RETRY_WAIT_TIME = 30000;
5050

5151
private final BigQuery bigQuery;
@@ -101,8 +101,7 @@ public Map<Long, List<BigQueryError>> performWriteRequest(
101101
try {
102102
request = createInsertAllRequest(tableId, rows);
103103
writeResponse = bigQuery.insertAll(request);
104-
// Should only perform one schema update attempt; may have to continue insert attempts due to
105-
// BigQuery schema updates taking up to two minutes to take effect
104+
// Should only perform one schema update attempt.
106105
if (writeResponse.hasErrors()
107106
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) {
108107
attemptSchemaUpdate(tableId, topic);
@@ -118,7 +117,8 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat
118117
}
119118
}
120119

121-
// Schema update or table creation might be delayed, so multiple insertion attempts may be necessary
120+
// Creating tables or updating table schemas in BigQuery takes up to 2~3 minutes to take affect,
121+
// so multiple insertion attempts may be necessary.
122122
int attemptCount = 0;
123123
while (writeResponse == null || writeResponse.hasErrors()) {
124124
logger.trace("insertion failed");
@@ -135,10 +135,10 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat
135135
return writeResponse.getInsertErrors();
136136
}
137137
attemptCount++;
138-
if (attemptCount >= AFTER_UPDATE_RETRY_LIMIT) {
138+
if (attemptCount >= RETRY_LIMIT) {
139139
throw new BigQueryConnectException(
140140
"Failed to write rows after BQ schema update within "
141-
+ AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
141+
+ RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
142142
}
143143
try {
144144
Thread.sleep(RETRY_WAIT_TIME);

0 commit comments

Comments
 (0)