Skip to content

Commit f7c1518

Browse files
farmdawgnationmtagle
authored andcommitted
Use batch delete operation when cleaning up blob ids (#215)
* First pass at using the batch delete api * Be a bit smarter about what we remove from deletableBlobIds * Correct a few syntax errors * Handle the case there are no blobs to delete * Tweak target index for removing failed deletes * Correct issues with long->int conversion
1 parent da33308 commit f7c1518

File tree

1 file changed

+37
-11
lines changed

1 file changed

+37
-11
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GCSToBQLoadRunnable.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,19 +263,45 @@ private void checkJobs() {
263263
*/
264264
private void deleteBlobs() {
265265
List<BlobId> blobIdsToDelete = new ArrayList<>();
266-
logger.info("Attempting to delete {} blobs", deletableBlobIds.size());
267-
for (BlobId blobId : deletableBlobIds) {
268-
try {
269-
bucket.getStorage().delete(blobId);
270-
blobIdsToDelete.add(blobId);
271-
} catch (StorageException ex) {
272-
logger.warn("Failed to delete blob: {}/{}", blobId.getBucket(), blobId.getName());
266+
blobIdsToDelete.addAll(deletableBlobIds);
267+
int numberOfBlobs = blobIdsToDelete.size();
268+
int failedDeletes = 0;
269+
int successfulDeletes = 0;
270+
271+
if (numberOfBlobs == 0) {
272+
logger.info("No blobs to delete");
273+
return;
274+
}
275+
276+
logger.info("Attempting to delete {} blobs", numberOfBlobs);
277+
278+
try {
279+
// Issue a batch delete api call
280+
List<Boolean> resultList = bucket.getStorage().delete(blobIdsToDelete);
281+
282+
// Filter the blobs we couldn't delete from the list of deletable blobs
283+
for (int i = 0; i < numberOfBlobs; i++) {
284+
if (!resultList.get(i)) {
285+
// This blob was not successful, remove it from the list.
286+
// Adjust the target index by the number of failed deletes we've
287+
// already seen since we're mutating the list as we go.
288+
int targetIndex = i - failedDeletes;
289+
blobIdsToDelete.remove(targetIndex);
290+
failedDeletes++;
291+
}
273292
}
293+
294+
// Calculate number of successful deletes, remove the successful deletes from
295+
// the deletableBlobIds.
296+
successfulDeletes = numberOfBlobs - failedDeletes;
297+
deletableBlobIds.removeAll(blobIdsToDelete);
298+
299+
logger.info("Successfully deleted {} blobs; failed to delete {} blobs",
300+
successfulDeletes,
301+
failedDeletes);
302+
} catch (StorageException ex) {
303+
logger.warn("Storage exception while attempting to delete blobs", ex);
274304
}
275-
deletableBlobIds.removeAll(blobIdsToDelete);
276-
logger.info("Successfully deleted {} blobs; failed to delete {} blobs",
277-
blobIdsToDelete.size(),
278-
deletableBlobIds.size());
279305
}
280306

281307
@Override

0 commit comments

Comments
 (0)