Skip to content

Commit 82ab7f5

Browse files
committed
GCSHook: Add flag to ignore NotFound error on blob deletion
This adds an `ignore_error` boolean to `GCSHook.delete`, which will suppress any 404 errors if a blob to be deleted doesn't exist.
1 parent d850b7c commit 82ab7f5

File tree

2 files changed

+48
-15
lines changed
  • providers/google
    • src/airflow/providers/google/cloud/hooks
    • tests/unit/google/cloud/hooks

2 files changed

+48
-15
lines changed

providers/google/src/airflow/providers/google/cloud/hooks/gcs.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -705,22 +705,30 @@ def is_older_than(self, bucket_name: str, object_name: str, seconds: int) -> boo
705705
return True
706706
return False
707707

708-
def delete(self, bucket_name: str, object_name: str) -> None:
708+
def delete(self, bucket_name: str, object_name: str, ignore_error: bool = False) -> None:
709709
"""
710710
Delete an object from the bucket.
711711
712712
:param bucket_name: name of the bucket, where the object resides
713713
:param object_name: name of the object to delete
714+
:param ignore_error: (Optional) whether to ignore NotFound exceptions. Default: False
714715
"""
716+
on_error = None
717+
if ignore_error:
718+
on_error = lambda blob: None
715719
client = self.get_conn()
716720
bucket = client.bucket(bucket_name)
717721
blob = bucket.blob(blob_name=object_name)
718-
blob.delete()
719-
get_hook_lineage_collector().add_input_asset(
720-
context=self, scheme="gs", asset_kwargs={"bucket": bucket.name, "key": blob.name}
721-
)
722-
723-
self.log.info("Blob %s deleted.", object_name)
722+
try:
723+
bucket.delete_blobs([blob], on_error=on_error)
724+
get_hook_lineage_collector().add_input_asset(
725+
context=self, scheme="gs", asset_kwargs={"bucket": bucket.name, "key": blob.name}
726+
)
727+
if not ignore_error:
728+
self.log.info("Blob %s deleted.", object_name)
729+
except NotFound:
730+
self.log.warning("Blob %s in bucket %s does not exist.", blob.name, bucket.name)
731+
raise
724732

725733
def get_bucket(self, bucket_name: str) -> storage.Bucket:
726734
"""

providers/google/tests/unit/google/cloud/hooks/test_gcs.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -528,32 +528,57 @@ def test_rewrite_exposes_lineage(self, mock_service, hook_lineage_collector):
528528

529529
@mock.patch("google.cloud.storage.Bucket")
530530
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
531-
def test_delete(self, mock_service, mock_bucket):
531+
def test_delete(self, mock_service, mock_bucket, caplog):
532532
test_bucket = "test_bucket"
533533
test_object = "test_object"
534534
blob_to_be_deleted = storage.Blob(name=test_object, bucket=mock_bucket)
535535

536-
get_bucket_method = mock_service.return_value.get_bucket
537-
get_blob_method = get_bucket_method.return_value.get_blob
538-
delete_method = get_blob_method.return_value.delete
536+
bucket_method = mock_service.return_value.bucket
537+
blob = bucket_method.return_value.blob
538+
delete_method = bucket_method.return_value.delete_blobs
539539
delete_method.return_value = blob_to_be_deleted
540540

541-
response = self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object)
541+
with caplog.at_level(logging.INFO):
542+
response = self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object)
542543
assert response is None
544+
bucket_method.assert_called_once_with(test_bucket)
545+
blob.assert_called_once_with(blob_name=test_object)
546+
delete_method.assert_called_once_with([blob.return_value], on_error=None)
547+
assert "Blob test_object deleted" in caplog.text
543548

544549
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
545-
def test_delete_nonexisting_object(self, mock_service):
550+
def test_delete_nonexisting_object(self, mock_service, caplog):
546551
test_bucket = "test_bucket"
547552
test_object = "test_object"
548553

549554
bucket_method = mock_service.return_value.bucket
550555
blob = bucket_method.return_value.blob
551-
delete_method = blob.return_value.delete
556+
delete_method = bucket_method.return_value.delete_blobs
552557
delete_method.side_effect = NotFound(message="Not Found")
553558

554-
with pytest.raises(NotFound):
559+
with pytest.raises(NotFound), caplog.at_level(logging.INFO):
555560
self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object)
556561

562+
bucket_method.assert_called_once_with(test_bucket)
563+
blob.assert_called_once_with(blob_name=test_object)
564+
delete_method.assert_called_once_with([blob.return_value], on_error=None)
565+
assert "does not exist" in caplog.text
566+
567+
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
568+
def test_delete_nonexisting_object_ignore_error(self, mock_service, caplog):
569+
test_bucket = "test_bucket"
570+
test_object = "test_object"
571+
572+
bucket_method = mock_service.return_value.bucket
573+
blob = bucket_method.return_value.blob
574+
delete_method = bucket_method.return_value.delete_blobs
575+
576+
self.gcs_hook.delete(bucket_name=test_bucket, object_name=test_object, ignore_error=True)
577+
578+
bucket_method.assert_called_once_with(test_bucket)
579+
blob.assert_called_once_with(blob_name=test_object)
580+
delete_method.assert_called_once_with([blob.return_value], on_error=mock.ANY)
581+
557582
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
558583
def test_delete_exposes_lineage(self, mock_service, hook_lineage_collector):
559584
test_bucket = "test_bucket"

0 commit comments

Comments
 (0)