Skip to content

Commit 4394b8f

Browse files
ashbkaxil
andauthored
Fix issue where LocalExecutor could start tasks before the state was commited (apache#56010)
* Fix issue where LocalExecutor could start tasks before the state was commited With some recent changes LocalExec was now able to start a task _too quickly_, and due to it's custom implementation of `queue_workload` it was directly sending the message to the MP queue the task in queue_workload, which means if there is an idle worker process already it will pick it up "instantly" -- crucially before the database transaction in with TI.state is changed from scheduled to queued, is committed! The fix here is to correctly follow the BaseExecutor interface, and not start send the workloads for processing until heartbeat is called (which happens in the scheduler right after the transaction is committed.) * Fix test --------- Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
1 parent f91698c commit 4394b8f

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

airflow-core/src/airflow/executors/local_executor.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
from airflow.executors import workloads
3838
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
39-
from airflow.utils.session import NEW_SESSION, provide_session
4039
from airflow.utils.state import TaskInstanceState
4140

4241
# add logger to parameter of setproctitle to support logging
@@ -48,8 +47,6 @@
4847
setproctitle = lambda title, logger: real_setproctitle(title)
4948

5049
if TYPE_CHECKING:
51-
from sqlalchemy.orm import Session
52-
5350
TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Exception | None]
5451

5552

@@ -253,9 +250,10 @@ def end(self) -> None:
253250
def terminate(self):
254251
"""Terminate the executor is not doing anything."""
255252

256-
@provide_session
257-
def queue_workload(self, workload: workloads.All, session: Session = NEW_SESSION):
258-
self.activity_queue.put(workload)
253+
def _process_workloads(self, workloads):
254+
for workload in workloads:
255+
self.activity_queue.put(workload)
256+
del self.queued_tasks[workload.ti.key]
259257
with self._unread_messages:
260-
self._unread_messages.value += 1
258+
self._unread_messages.value += len(workloads)
261259
self._check_workers()

airflow-core/tests/unit/executors/test_local_executor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from airflow._shared.timezones import timezone
2929
from airflow.executors import workloads
3030
from airflow.executors.local_executor import LocalExecutor, _execute_work
31+
from airflow.settings import Session
3132
from airflow.utils.state import State
3233

3334
from tests_common.test_utils.config import conf_vars
@@ -97,7 +98,8 @@ def fake_supervise(ti, **kwargs):
9798
dag_rel_path="some/path",
9899
log_path=None,
99100
bundle_info=dict(name="hi", version="hi"),
100-
)
101+
),
102+
session=mock.MagicMock(spec=Session),
101103
)
102104

103105
executor.queue_workload(
@@ -107,9 +109,13 @@ def fake_supervise(ti, **kwargs):
107109
dag_rel_path="some/path",
108110
log_path=None,
109111
bundle_info=dict(name="hi", version="hi"),
110-
)
112+
),
113+
session=mock.MagicMock(spec=Session),
111114
)
112115

116+
# Process queued workloads to trigger worker spawning
117+
executor._process_workloads(list(executor.queued_tasks.values()))
118+
113119
executor.end()
114120

115121
expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism

0 commit comments

Comments
 (0)