Skip to content

Commit cec8ba6

Browse files
nathadfieldclaude
andauthored
Fix deferrable sensors not respecting soft_fail on timeout (#61132)
When a deferrable sensor with soft_fail=True times out, the task fails with AirflowSensorTimeout instead of being marked as SKIPPED. This is a regression from Airflow 2.x behavior. The issue was in resume_execution() where TaskDeferralTimeout was converted to AirflowSensorTimeout before checking soft_fail. This fix uses nested exception handling to check soft_fail and never_fail before the conversion, ensuring timeouts are properly skipped. closes: #61130 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 8d9ba56 commit cec8ba6

File tree

2 files changed

+66
-4
lines changed

2 files changed

+66
-4
lines changed

task-sdk/src/airflow/sdk/bases/sensor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,18 @@ def run_duration() -> float:
249249
return xcom_value
250250

251251
def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context):
252+
# Use nested try/except to convert TaskDeferralTimeout to AirflowSensorTimeout
253+
# while still allowing soft_fail/never_fail to handle both exception types.
252254
try:
253-
return super().resume_execution(next_method, next_kwargs, context)
254-
except TaskDeferralTimeout as e:
255-
raise AirflowSensorTimeout(*e.args) from e
255+
try:
256+
return super().resume_execution(next_method, next_kwargs, context)
257+
except TaskDeferralTimeout as e:
258+
raise AirflowSensorTimeout(*e.args) from e
256259
except (AirflowException, TaskDeferralError) as e:
257260
if self.soft_fail:
258-
raise AirflowSkipException(str(e)) from e
261+
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
262+
if self.never_fail:
263+
raise AirflowSkipException("Skipping due to never_fail is set to True.") from e
259264
raise
260265

261266
def _get_next_poke_interval(

task-sdk/tests/task_sdk/bases/test_sensor.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
AirflowSensorTimeout,
3737
AirflowSkipException,
3838
AirflowTaskTimeout,
39+
TaskDeferralError,
3940
)
4041
from airflow.sdk.execution_time.comms import RescheduleTask, TaskRescheduleStartDate
4142
from airflow.sdk.timezone import datetime
@@ -684,3 +685,59 @@ def test_fail_after_resuming_deferred_sensor(self, soft_fail, expected_exception
684685
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail)
685686
with pytest.raises(expected_exception):
686687
async_sensor.resume_execution("execute_complete", None, {})
688+
689+
@pytest.mark.parametrize(
690+
("soft_fail", "expected_exception"),
691+
[
692+
(True, AirflowSkipException),
693+
(False, AirflowSensorTimeout),
694+
],
695+
)
696+
def test_timeout_after_resuming_deferred_sensor_with_soft_fail(self, soft_fail, expected_exception):
697+
"""Test that deferrable sensors with soft_fail skip on timeout instead of failing."""
698+
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail)
699+
with pytest.raises(expected_exception):
700+
async_sensor.resume_execution(
701+
next_method="__fail__",
702+
next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
703+
context={},
704+
)
705+
706+
def test_timeout_after_resuming_deferred_sensor_with_never_fail(self):
707+
"""Test that deferrable sensors with never_fail skip on timeout."""
708+
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", never_fail=True)
709+
with pytest.raises(AirflowSkipException):
710+
async_sensor.resume_execution(
711+
next_method="__fail__",
712+
next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
713+
context={},
714+
)
715+
716+
@pytest.mark.parametrize(
717+
("soft_fail", "expected_exception"),
718+
[
719+
(True, AirflowSkipException),
720+
(False, TaskDeferralError),
721+
],
722+
)
723+
def test_trigger_failure_after_resuming_deferred_sensor_with_soft_fail(
724+
self, soft_fail, expected_exception
725+
):
726+
"""Test that deferrable sensors with soft_fail skip on trigger failure instead of failing."""
727+
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail)
728+
with pytest.raises(expected_exception):
729+
async_sensor.resume_execution(
730+
next_method="__fail__",
731+
next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
732+
context={},
733+
)
734+
735+
def test_trigger_failure_after_resuming_deferred_sensor_with_never_fail(self):
736+
"""Test that deferrable sensors with never_fail skip on trigger failure."""
737+
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", never_fail=True)
738+
with pytest.raises(AirflowSkipException):
739+
async_sensor.resume_execution(
740+
next_method="__fail__",
741+
next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
742+
context={},
743+
)

0 commit comments

Comments
 (0)