Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def __init__(
f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}"
)

if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x")

def execute(self, context: Context):
if self.logical_date is NOTSET:
# If no logical_date is provided we will set utcnow()
Expand Down Expand Up @@ -218,8 +221,9 @@ def execute(self, context: Context):
if self.fail_when_dag_is_paused:
dag_model = DagModel.get_current(self.trigger_dag_id)
if dag_model.is_paused:
if AIRFLOW_V_3_0_PLUS:
raise DagIsPaused(dag_id=self.trigger_dag_id)
# TODO: enable this when dag state endpoint available from task sdk
# if AIRFLOW_V_3_0_PLUS:
# raise DagIsPaused(dag_id=self.trigger_dag_id)
raise AirflowException(f"Dag {self.trigger_dag_id} is paused")

if AIRFLOW_V_3_0_PLUS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.models.dagrun import DagRun
from airflow.models.log import Log
from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.trigger_dagrun import DagIsPaused, TriggerDagRunOperator
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.triggers.external_task import DagStateTrigger
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down Expand Up @@ -255,6 +255,18 @@ def test_trigger_dag_run_execute_complete_should_fail(self):
),
)

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
def test_trigger_dag_run_with_fail_when_dag_is_paused_should_fail(self):
with pytest.raises(
NotImplementedError, match="Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x"
):
TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
conf={"foo": "bar"},
fail_when_dag_is_paused=True,
)


# TODO: To be removed once the provider drops support for Airflow 2
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test only for Airflow 2")
Expand Down Expand Up @@ -771,9 +783,5 @@ def test_trigger_dagrun_with_fail_when_dag_is_paused(self, dag_maker, session):
fail_when_dag_is_paused=True,
)
dag_maker.create_dagrun()
if AIRFLOW_V_3_0_PLUS:
error = DagIsPaused
else:
error = AirflowException
with pytest.raises(error, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"):
with pytest.raises(AirflowException, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
Loading