From fb23973090400820ed97af87e46a48a38957dfb4 Mon Sep 17 00:00:00 2001 From: Anders Eriksson Date: Thu, 20 Aug 2020 16:57:37 +0200 Subject: [PATCH] Make GCSToBQLoadRunnable aware of which target BQ tables it should make jobs to write to (cherry picked from commit 3e3d7ff98e36df1e6b746980ce58112bb784e304) --- .../kafka/connect/bigquery/BigQuerySinkTask.java | 2 +- .../kafka/connect/bigquery/GCSToBQLoadRunnable.java | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..cf3d9defd 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -359,7 +359,7 @@ private void startGCSToBQLoadTask() { BucketInfo bucketInfo = BucketInfo.of(bucketName); bucket = gcs.create(bucketInfo); } - GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket); + GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket, topicsToBaseTableIds); int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG); gcsLoadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java index 348fccb29..82d7d0cb3 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java @@ -73,15 +73,18 @@ public class GCSToBQLoadRunnable implements Runnable { private static String SOURCE_URI_FORMAT = "gs://%s/%s"; public static final Pattern METADATA_TABLE_PATTERN = Pattern.compile("((?[^:]+):)?(?[^.]+)\\.(?.+)"); + private final Set targetTableIds; /** - * Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, and ms wait interval. + * Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, target tables and ms wait interval. * @param bigQuery the {@link BigQuery} instance. * @param bucket the the GCS bucket to read from. + * @param topicsToBaseTableIds target tables to write to */ - public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket) { + public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket, Map topicsToBaseTableIds) { this.bigQuery = bigQuery; this.bucket = bucket; + this.targetTableIds = topicsToBaseTableIds.values().stream().collect(Collectors.toSet()); this.activeJobs = new HashMap<>(); this.claimedBlobIds = new HashSet<>(); this.deletableBlobIds = new HashSet<>(); @@ -109,11 +112,15 @@ private Map> getBlobsUpToLimit() { TableId table = getTableFromBlob(blob); logger.debug("Checking blob bucket={}, name={}, table={} ", blob.getBucket(), blob.getName(), table ); - if (table == null || claimedBlobIds.contains(blobId) || deletableBlobIds.contains(blobId)) { + if (table == null + || claimedBlobIds.contains(blobId) + || deletableBlobIds.contains(blobId) + || !targetTableIds.contains(table)) { // don't do anything if: // 1. we don't know what table this should be uploaded to or // 2. this blob is already claimed by a currently-running job or // 3. this blob is up for deletion. + // 4. this blob is not targeted for our target tables continue; }