Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private void startGCSToBQLoadTask() {
BucketInfo bucketInfo = BucketInfo.of(bucketName);
bucket = gcs.create(bucketInfo);
}
GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket);
GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket, topicsToBaseTableIds);

int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG);
gcsLoadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,18 @@ public class GCSToBQLoadRunnable implements Runnable {
private static String SOURCE_URI_FORMAT = "gs://%s/%s";
public static final Pattern METADATA_TABLE_PATTERN =
Pattern.compile("((?<project>[^:]+):)?(?<dataset>[^.]+)\\.(?<table>.+)");
private final Set<TableId> targetTableIds;

/**
* Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, and ms wait interval.
* Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, target tables and ms wait interval.
* @param bigQuery the {@link BigQuery} instance.
* @param bucket the the GCS bucket to read from.
* @param topicsToBaseTableIds target tables to write to
*/
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket) {
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket, Map<String, TableId> topicsToBaseTableIds) {
this.bigQuery = bigQuery;
this.bucket = bucket;
this.targetTableIds = topicsToBaseTableIds.values().stream().collect(Collectors.toSet());
this.activeJobs = new HashMap<>();
this.claimedBlobIds = new HashSet<>();
this.deletableBlobIds = new HashSet<>();
Expand Down Expand Up @@ -109,11 +112,15 @@ private Map<TableId, List<Blob>> getBlobsUpToLimit() {
TableId table = getTableFromBlob(blob);
logger.debug("Checking blob bucket={}, name={}, table={} ", blob.getBucket(), blob.getName(), table );

if (table == null || claimedBlobIds.contains(blobId) || deletableBlobIds.contains(blobId)) {
if (table == null
|| claimedBlobIds.contains(blobId)
|| deletableBlobIds.contains(blobId)
|| !targetTableIds.contains(table)) {
// don't do anything if:
// 1. we don't know what table this should be uploaded to or
// 2. this blob is already claimed by a currently-running job or
// 3. this blob is up for deletion.
// 4. this blob is not targeted for our target tables
continue;
}

Expand Down