Skip to content

Commit 0b2ffec

Browse files
author
Bingqin Zhou
committed
Auto-create tables for GCS Load mode.
1 parent 8709d04 commit 0b2ffec

File tree

7 files changed

+143
-41
lines changed

7 files changed

+143
-41
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ public void put(Collection<SinkRecord> records) {
201201
if (!tableWriterBuilders.containsKey(table)) {
202202
TableWriterBuilder tableWriterBuilder;
203203
if (config.getList(config.ENABLE_BATCH_CONFIG).contains(record.topic())) {
204-
String gcsBlobName = record.topic() + "_" + uuid + "_" + Instant.now().toEpochMilli();
204+
String topic = record.topic();
205+
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
205206
String gcsFolderName = config.getString(config.GCS_FOLDER_NAME_CONFIG);
206207
if (gcsFolderName != null && !"".equals(gcsFolderName)) {
207208
gcsBlobName = gcsFolderName + "/" + gcsBlobName;
@@ -211,6 +212,7 @@ public void put(Collection<SinkRecord> records) {
211212
table.getBaseTableId(),
212213
config.getString(config.GCS_BUCKET_NAME_CONFIG),
213214
gcsBlobName,
215+
topic,
214216
recordConverter);
215217
} else {
216218
tableWriterBuilder =
@@ -299,10 +301,13 @@ private GCSToBQWriter getGcsWriter() {
299301
BigQuery bigQuery = getBigQuery();
300302
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
301303
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
304+
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
302305
return new GCSToBQWriter(getGcs(),
303306
bigQuery,
307+
getSchemaManager(bigQuery),
304308
retry,
305-
retryWait);
309+
retryWait,
310+
autoCreateTables);
306311
}
307312

308313
@Override

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class GCSBatchTableWriter implements Runnable {
4545

4646
private final String bucketName;
4747
private final String blobName;
48+
private final String topic;
4849

4950
private final List<RowToInsert> rows;
5051
private final GCSToBQWriter writer;
@@ -61,10 +62,12 @@ private GCSBatchTableWriter(List<RowToInsert> rows,
6162
GCSToBQWriter writer,
6263
TableId tableId,
6364
String bucketName,
64-
String baseBlobName) {
65+
String baseBlobName,
66+
String topic) {
6567
this.tableId = tableId;
6668
this.bucketName = bucketName;
6769
this.blobName = baseBlobName;
70+
this.topic = topic;
6871

6972
this.rows = rows;
7073
this.writer = writer;
@@ -73,7 +76,7 @@ private GCSBatchTableWriter(List<RowToInsert> rows,
7376
@Override
7477
public void run() {
7578
try {
76-
writer.writeRows(rows, tableId, bucketName, blobName);
79+
writer.writeRows(rows, tableId, bucketName, blobName, topic);
7780
} catch (ConnectException ex) {
7881
throw new ConnectException("Failed to write rows to GCS", ex);
7982
} catch (InterruptedException ex) {
@@ -87,6 +90,7 @@ public void run() {
8790
public static class Builder implements TableWriterBuilder {
8891
private final String bucketName;
8992
private String blobName;
93+
private String topic;
9094

9195
private final TableId tableId;
9296

@@ -107,10 +111,12 @@ public Builder(GCSToBQWriter writer,
107111
TableId tableId,
108112
String gcsBucketName,
109113
String gcsBlobName,
114+
String topic,
110115
RecordConverter<Map<String, Object>> recordConverter) {
111116

112117
this.bucketName = gcsBucketName;
113118
this.blobName = gcsBlobName;
119+
this.topic = topic;
114120

115121
this.tableId = tableId;
116122

@@ -133,7 +139,7 @@ public void addRow(RowToInsert rowToInsert) {
133139
}
134140

135141
public GCSBatchTableWriter build() {
136-
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName);
142+
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic);
137143
}
138144
}
139145
}

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import com.google.cloud.bigquery.BigQuery;
22+
import com.google.cloud.bigquery.TableId;
2223
import com.google.cloud.bigquery.BigQueryError;
2324
import com.google.cloud.bigquery.BigQueryException;
2425
import com.google.cloud.bigquery.InsertAllRequest;
@@ -109,7 +110,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch
109110
} catch (BigQueryException exception) {
110111
// Should only perform one table creation attempt.
111112
if (isTableNotExisted(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) {
112-
attemptTableCreate(tableId, topic);
113+
attemptTableCreate(tableId.getBaseTableId(), topic);
113114
} else if (isTableMissingSchema(exception) && updateSchemas) {
114115
attemptSchemaUpdate(tableId, topic);
115116
} else {
@@ -158,13 +159,13 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) {
158159
}
159160
}
160161

161-
private void attemptTableCreate(PartitionedTableId tableId, String topic) {
162+
private void attemptTableCreate(TableId tableId, String topic) {
162163
try {
163-
schemaManager.createTable(tableId.getBaseTableId(), topic);
164-
logger.info("Table {} does not exist, auto-created table for topic {}", tableId.getBaseTableName(), topic);
164+
schemaManager.createTable(tableId, topic);
165+
logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic);
165166
} catch (BigQueryException exception) {
166167
throw new BigQueryConnectException(
167-
"Failed to create table " + tableId.getBaseTableName(), exception);
168+
"Failed to create table " + tableId, exception);
168169
}
169170
}
170171

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriter.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import com.google.cloud.bigquery.BigQuery;
22+
import com.google.cloud.bigquery.BigQueryException;
2223
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
2324
import com.google.cloud.bigquery.TableId;
2425
import com.google.cloud.storage.Blob;
@@ -28,6 +29,8 @@
2829
import com.google.cloud.storage.StorageException;
2930
import com.google.gson.Gson;
3031

32+
import com.wepay.kafka.connect.bigquery.SchemaManager;
33+
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
3134
import com.wepay.kafka.connect.bigquery.exception.GCSConnectException;
3235

3336
import org.apache.kafka.connect.errors.ConnectException;
@@ -53,12 +56,16 @@ public class GCSToBQWriter {
5356

5457
private final BigQuery bigQuery;
5558

59+
private final SchemaManager schemaManager;
60+
5661
private static final int WAIT_MAX_JITTER = 1000;
5762

5863
private static final Random random = new Random();
5964

6065
private int retries;
6166
private long retryWaitMs;
67+
private boolean autoCreateTables;
68+
6269

6370
public static final String GCS_METADATA_TABLE_KEY = "sinkTable";
6471

@@ -71,13 +78,17 @@ public class GCSToBQWriter {
7178
*/
7279
public GCSToBQWriter(Storage storage,
7380
BigQuery bigQuery,
81+
SchemaManager schemaManager,
7482
int retries,
75-
long retryWaitMs) {
83+
long retryWaitMs,
84+
boolean autoCreateTables) {
7685
this.storage = storage;
7786
this.bigQuery = bigQuery;
87+
this.schemaManager = schemaManager;
7888

7989
this.retries = retries;
8090
this.retryWaitMs = retryWaitMs;
91+
this.autoCreateTables = autoCreateTables;
8192
}
8293

8394
/**
@@ -92,7 +103,8 @@ public GCSToBQWriter(Storage storage,
92103
public void writeRows(List<RowToInsert> rows,
93104
TableId tableId,
94105
String bucketName,
95-
String blobName) throws InterruptedException {
106+
String blobName,
107+
String topic) throws InterruptedException {
96108

97109
// Get Source URI
98110
BlobId blobId = BlobId.of(bucketName, blobName);
@@ -103,9 +115,8 @@ public void writeRows(List<RowToInsert> rows,
103115

104116
// Check if the table specified exists
105117
// This error shouldn't be thrown. All tables should be created by the connector at startup
106-
if (bigQuery.getTable(tableId) == null) {
107-
throw new ConnectException(
108-
String.format("Table with TableId %s does not exist.", tableId.getTable()));
118+
if (autoCreateTables && bigQuery.getTable(tableId) == null) {
119+
attemptTableCreate(tableId, topic);
109120
}
110121

111122
int attemptCount = 0;
@@ -184,4 +195,14 @@ private String toJson(List<RowToInsert> rows) {
184195
private void waitRandomTime() throws InterruptedException {
185196
Thread.sleep(retryWaitMs + random.nextInt(WAIT_MAX_JITTER));
186197
}
198+
199+
private void attemptTableCreate(TableId tableId, String topic) {
200+
try {
201+
schemaManager.createTable(tableId, topic);
202+
logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic);
203+
} catch (BigQueryException exception) {
204+
throw new BigQueryConnectException(
205+
"Failed to create table " + tableId, exception);
206+
}
207+
}
187208
}

0 commit comments

Comments
 (0)