diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index b70841525303b..46580d6206a90 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -189,6 +189,9 @@ def __init__( f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}" ) + if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: + raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x") + def execute(self, context: Context): if self.logical_date is NOTSET: # If no logical_date is provided we will set utcnow() @@ -218,8 +221,9 @@ def execute(self, context: Context): if self.fail_when_dag_is_paused: dag_model = DagModel.get_current(self.trigger_dag_id) if dag_model.is_paused: - if AIRFLOW_V_3_0_PLUS: - raise DagIsPaused(dag_id=self.trigger_dag_id) + # TODO: enable this when dag state endpoint available from task sdk + # if AIRFLOW_V_3_0_PLUS: + # raise DagIsPaused(dag_id=self.trigger_dag_id) raise AirflowException(f"Dag {self.trigger_dag_id} is paused") if AIRFLOW_V_3_0_PLUS: diff --git a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py index dc3f32d535035..1d72fbb670e87 100644 --- a/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py +++ b/providers/standard/tests/unit/standard/operators/test_trigger_dagrun.py @@ -31,7 +31,7 @@ from airflow.models.dagrun import DagRun from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance -from airflow.providers.standard.operators.trigger_dagrun import DagIsPaused, TriggerDagRunOperator +from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.triggers.external_task import DagStateTrigger from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState @@ -255,6 +255,18 @@ def test_trigger_dag_run_execute_complete_should_fail(self): ), ) + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3") + def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self): + with pytest.raises( + NotImplementedError, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x" + ): + TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id=TRIGGERED_DAG_ID, + conf={"foo": "bar"}, + fail_when_dag_is_paused=True, + ) + # TODO: To be removed once the provider drops support for Airflow 2 @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2") @@ -771,9 +783,5 @@ def test_trigger_dagrun_with_fail_when_dag_is_paused(self, dag_maker, session): fail_when_dag_is_paused=True, ) dag_maker.create_dagrun() - if AIRFLOW_V_3_0_PLUS: - error = DagIsPaused - else: - error = AirflowException - with pytest.raises(error, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"): + with pytest.raises(AirflowException, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"): task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)