From 6cb591961e187ad6bff031d0c81a0661f342ea6c Mon Sep 17 00:00:00 2001 From: gopidesupavan Date: Tue, 21 Oct 2025 17:21:24 +0100 Subject: [PATCH 1/4] Throw FeatureNotAvailable error when fail_when_dag_is_paused is used in TriggerDagRunOperator with Airflow 3 --- .../standard/operators/trigger_dagrun.py | 19 +++++++++++++++++-- .../standard/operators/test_trigger_dagrun.py | 19 +++++++++++++------ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index b70841525303b..76076e8fc1157 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -65,6 +65,17 @@ def __str__(self) -> str: return f"Dag {self.dag_id} is paused" +class FeatureNotAvailable(AirflowException): + """Raise when a feature is not available.""" + + def __init__(self, error_message) -> None: + super().__init__(error_message) + self.error_message = error_message + + def __str__(self) -> str: + return self.error_message + + class TriggerDagRunLink(BaseOperatorLink): """ Operator link for TriggerDagRunOperator. @@ -189,6 +200,9 @@ def __init__( f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}" ) + if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: + raise FeatureNotAvailable("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0") + def execute(self, context: Context): if self.logical_date is NOTSET: # If no logical_date is provided we will set utcnow() @@ -218,8 +232,9 @@ def execute(self, context: Context): if self.fail_when_dag_is_paused: dag_model = DagModel.get_current(self.trigger_dag_id) if dag_model.is_paused: - if AIRFLOW_V_3_0_PLUS: - raise DagIsPaused(dag_id=self.trigger_dag_id) + # TODO: enable this when dag state endpoint available from task sdk + # if AIRFLOW_V_3_0_PLUS: + # raise DagIsPaused(dag_id=self.trigger_dag_id) raise AirflowException(f"Dag {self.trigger_dag_id} is paused") if AIRFLOW_V_3_0_PLUS: diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index dc3f32d535035..8746d29c8f295 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -31,7 +31,7 @@ from airflow.models.dagrun import DagRun from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance -from airflow.providers.standard.operators.trigger_dagrun import DagIsPaused, TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import FeatureNotAvailable, TriggerDagRunOperator from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState @@ -255,6 +255,17 @@ def test_trigger_dag_run_execute_complete_should_fail(self): ), ) + def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): + with pytest.raises( + FeatureNotAvailable, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0" + ): + TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=TRIGGERED_DAG_ID, + conf={"foo": "bar"}, + fail_when_dag_is_paused=True, + ) + # TODO: To be removed once the provider drops support for Airflow 2 @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2") @@ -771,9 +782,5 @@ def test_trigger_dagrun_with_fail_when_dag_is_paused(self, dag_maker, session): fail_when_dag_is_paused=True, ) dag_maker.create_dagrun() - if AIRFLOW_V_3_0_PLUS: - error = DagIsPaused - else: - error = AirflowException - with pytest.raises(error, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"): + with pytest.raises(AirflowException, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"): task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) From 27fa62abe20f755caa9acc5b87588c48306e6c3b Mon Sep 17 00:00:00 2001 From: gopidesupavan Date: Tue, 21 Oct 2025 20:59:09 +0100 Subject: [PATCH 2/4] Fixup tests --- .../tests/unit/standard/operators/test_trigger_dagrun.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 8746d29c8f295..df9e917e337eb 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -255,6 +255,7 @@ def test_trigger_dag_run_execute_complete_should_fail(self): ), ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): with pytest.raises( FeatureNotAvailable, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0" From 89e8e775d7969e3210b76df4cff8faf2baa20a7f Mon Sep 17 00:00:00 2001 From: gopidesupavan Date: Wed, 22 Oct 2025 11:26:33 +0100 Subject: [PATCH 3/4] use NotImplementedError --- .../providers/standard/operators/trigger_dagrun.py | 13 +------------ .../unit/standard/operators/test_trigger_dagrun.py | 4 ++-- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 76076e8fc1157..c0d9692d21dd2 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -65,17 +65,6 @@ def __str__(self) -> str: return f"Dag {self.dag_id} is paused" -class FeatureNotAvailable(AirflowException): - """Raise when a feature is not available.""" - - def __init__(self, error_message) -> None: - super().__init__(error_message) - self.error_message = error_message - - def __str__(self) -> str: - return self.error_message - - class TriggerDagRunLink(BaseOperatorLink): """ Operator link for TriggerDagRunOperator. @@ -201,7 +190,7 @@ def __init__( ) if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: - raise FeatureNotAvailable("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0") + raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0") def execute(self, context: Context): if self.logical_date is NOTSET: diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index df9e917e337eb..33113b1867d8b 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -31,7 +31,7 @@ from airflow.models.dagrun import DagRun from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance -from airflow.providers.standard.operators.trigger_dagrun import FeatureNotAvailable, TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState @@ -258,7 +258,7 @@ def test_trigger_dag_run_execute_complete_should_fail(self): @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): with pytest.raises( - FeatureNotAvailable, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0" + NotImplementedError, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0" ): TriggerDagRunOperator( task_id="test_task", From 8e6e849cb79b7b87605c51bed22cd1f6f842349f Mon Sep 17 00:00:00 2001 From: gopidesupavan Date: Wed, 22 Oct 2025 11:30:03 +0100 Subject: [PATCH 4/4] update with 3.x --- .../src/airflow/providers/standard/operators/trigger_dagrun.py | 2 +- .../tests/unit/standard/operators/test_trigger_dagrun.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index c0d9692d21dd2..46580d6206a90 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -190,7 +190,7 @@ def __init__( ) if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: - raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0") + raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x") def execute(self, context: Context): if self.logical_date is NOTSET: diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index 33113b1867d8b..1d72fbb670e87 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -258,7 +258,7 @@ def test_trigger_dag_run_execute_complete_should_fail(self): @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): with pytest.raises( - NotImplementedError, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.0" + NotImplementedError, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x" ): TriggerDagRunOperator( task_id="test_task",