Skip to content

Commit 8d1f452

Browse files
committed
Throw FeatureNotAvailable error when fail_when_dag_is_paused is used in TriggerDagRunOperator with Airflow 3
1 parent 9366ab2 commit 8d1f452

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,17 @@ def __str__(self) -> str:
6565
return f"Dag {self.dag_id} is paused"
6666

6767

68+
class FeatureNotAvailable(AirflowException):
69+
"""Raise when a feature is not available."""
70+
71+
def __init__(self, error_message) -> None:
72+
super().__init__(error_message)
73+
self.error_message = error_message
74+
75+
def __str__(self) -> str:
76+
return self.error_message
77+
78+
6879
class TriggerDagRunLink(BaseOperatorLink):
6980
"""
7081
Operator link for TriggerDagRunOperator.
@@ -189,6 +200,9 @@ def __init__(
189200
f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}"
190201
)
191202

203+
if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
204+
raise FeatureNotAvailable("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0")
205+
192206
def execute(self, context: Context):
193207
if self.logical_date is NOTSET:
194208
# If no logical_date is provided we will set utcnow()
@@ -218,8 +232,9 @@ def execute(self, context: Context):
218232
if self.fail_when_dag_is_paused:
219233
dag_model = DagModel.get_current(self.trigger_dag_id)
220234
if dag_model.is_paused:
221-
if AIRFLOW_V_3_0_PLUS:
222-
raise DagIsPaused(dag_id=self.trigger_dag_id)
235+
# TODO: enable this when dag state endpoint available from task sdk
236+
# if AIRFLOW_V_3_0_PLUS:
237+
# raise DagIsPaused(dag_id=self.trigger_dag_id)
223238
raise AirflowException(f"Dag {self.trigger_dag_id} is paused")
224239

225240
if AIRFLOW_V_3_0_PLUS:

providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from airflow.models.dagrun import DagRun
3232
from airflow.models.log import Log
3333
from airflow.models.taskinstance import TaskInstance
34-
from airflow.providers.standard.operators.trigger_dagrun import DagIsPaused, TriggerDagRunOperator
34+
from airflow.providers.standard.operators.trigger_dagrun import FeatureNotAvailable, TriggerDagRunOperator
3535
from airflow.providers.standard.triggers.external_task import DagStateTrigger
3636
from airflow.utils.session import create_session
3737
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -255,6 +255,17 @@ def test_trigger_dag_run_execute_complete_should_fail(self):
255255
),
256256
)
257257

258+
def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
259+
with pytest.raises(
260+
FeatureNotAvailable, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0"
261+
):
262+
TriggerDagRunOperator(
263+
task_id="test_task",
264+
trigger_dag_id=TRIGGERED_DAG_ID,
265+
conf={"foo": "bar"},
266+
fail_when_dag_is_paused=True,
267+
)
268+
258269

259270
# TODO: To be removed once the provider drops support for Airflow 2
260271
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
@@ -771,9 +782,5 @@ def test_trigger_dagrun_with_fail_when_dag_is_paused(self, dag_maker, session):
771782
fail_when_dag_is_paused=True,
772783
)
773784
dag_maker.create_dagrun()
774-
if AIRFLOW_V_3_0_PLUS:
775-
error = DagIsPaused
776-
else:
777-
error = AirflowException
778-
with pytest.raises(error, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"):
785+
with pytest.raises(AirflowException, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"):
779786
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

0 commit comments

Comments
 (0)