Skip to content

Commit 2e1422b

Browse files
Add fail_on_file_not_exist to SFTPToGCSOperator (apache#56528)
* Add fail_on_file_not_exist to SFTPToGCSOperator * working with local DAG * working with local DAG * Trigger Build * fix test * fix unit test case --------- Co-authored-by: John Nguyen <john.nguyen@creditkarma.com>
1 parent 2e78e01 commit 2e1422b

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

providers/google/src/airflow/providers/google/cloud/transfers/sftp_to_gcs.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class SFTPToGCSOperator(BaseOperator):
7878
then uploads (may require significant disk space).
7979
When ``True``, the file streams directly without using local disk.
8080
Defaults to ``False``.
81+
:param fail_on_file_not_exist: If True, operator fails when file does not exist,
82+
if False, operator will not fail and skips transfer. Default is True.
8183
"""
8284

8385
template_fields: Sequence[str] = (
@@ -101,6 +103,7 @@ def __init__(
101103
impersonation_chain: str | Sequence[str] | None = None,
102104
sftp_prefetch: bool = True,
103105
use_stream: bool = False,
106+
fail_on_file_not_exist: bool = True,
104107
**kwargs,
105108
) -> None:
106109
super().__init__(**kwargs)
@@ -116,6 +119,7 @@ def __init__(
116119
self.impersonation_chain = impersonation_chain
117120
self.sftp_prefetch = sftp_prefetch
118121
self.use_stream = use_stream
122+
self.fail_on_file_not_exist = fail_on_file_not_exist
119123

120124
@cached_property
121125
def sftp_hook(self):
@@ -156,7 +160,13 @@ def execute(self, context: Context):
156160
destination_object = (
157161
self.destination_path if self.destination_path else self.source_path.rsplit("/", 1)[1]
158162
)
159-
self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object)
163+
try:
164+
self._copy_single_object(gcs_hook, self.sftp_hook, self.source_path, destination_object)
165+
except FileNotFoundError as e:
166+
if self.fail_on_file_not_exist:
167+
raise e
168+
self.log.info("File %s not found on SFTP server. Skipping transfer.", self.source_path)
169+
return
160170

161171
def _copy_single_object(
162172
self,
@@ -172,7 +182,6 @@ def _copy_single_object(
172182
self.destination_bucket,
173183
destination_object,
174184
)
175-
176185
if self.use_stream:
177186
dest_bucket = gcs_hook.get_bucket(self.destination_bucket)
178187
dest_blob = dest_bucket.blob(destination_object)

providers/google/tests/unit/google/cloud/transfers/test_sftp_to_gcs.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import os
2121
from unittest import mock
22+
from unittest.mock import patch
2223

2324
import pytest
2425

@@ -377,3 +378,26 @@ def test_get_openlineage_facets(
377378
assert result.inputs[0].name == expected_source
378379
assert result.outputs[0].namespace == f"gs://{TEST_BUCKET}"
379380
assert result.outputs[0].name == expected_destination
381+
382+
@pytest.mark.parametrize("fail_on_file_not_exist", [False, True])
383+
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.GCSHook")
384+
@mock.patch("airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPHook")
385+
def test_sftp_to_gcs_fail_on_file_not_exist(self, sftp_hook, gcs_hook, fail_on_file_not_exist):
386+
invalid_file_name = "main_dir/invalid-object.json"
387+
task = SFTPToGCSOperator(
388+
task_id=TASK_ID,
389+
source_path=invalid_file_name,
390+
destination_bucket=TEST_BUCKET,
391+
destination_path=DESTINATION_PATH_FILE,
392+
move_object=False,
393+
gcp_conn_id=GCP_CONN_ID,
394+
sftp_conn_id=SFTP_CONN_ID,
395+
impersonation_chain=IMPERSONATION_CHAIN,
396+
fail_on_file_not_exist=fail_on_file_not_exist,
397+
)
398+
with patch.object(sftp_hook.return_value, "retrieve_file", side_effect=FileNotFoundError):
399+
if fail_on_file_not_exist:
400+
with pytest.raises(FileNotFoundError):
401+
task.execute(None)
402+
else:
403+
task.execute(None)

0 commit comments

Comments
 (0)