Skip to content

Commit 5841e6e

Browse files
karenbraganzRNHTTR
authored andcommitted
Heartbeat timeout docs (apache#46257)
* Emphasize task heartbeat timeout terminology in docs to match logs * Grammatical correction * Grammatical correction * edit docs * redirect URL * Update docs/apache-airflow/core-concepts/tasks.rst Co-authored-by: Ryan Hatter <[email protected]> * Update docs/apache-airflow/core-concepts/tasks.rst Co-authored-by: Ryan Hatter <[email protected]> * Update docs/apache-airflow/core-concepts/tasks.rst Co-authored-by: Ryan Hatter <[email protected]> * Edit docs * Update config.yml with new config names * Update code to use heartbeat timeout terminology * Update code to include heartbeat timeout terminology * Fix incorrect config name in test_sync_orphaned_tasks * Change task_instance_heartbeat_timeout_threshold to task_instance_heartbeat_timeout * Update supervisor.py * Check for Edge executor test config version compatibility --------- Co-authored-by: Ryan Hatter <[email protected]>
1 parent 03ff159 commit 5841e6e

File tree

15 files changed

+143
-93
lines changed

15 files changed

+143
-93
lines changed

airflow/callbacks/callback_requests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class BaseCallbackRequest(BaseModel):
3939
bundle_name: str
4040
bundle_version: str | None
4141
msg: str | None = None
42-
"""Additional Message that can be used for logging to determine failure/zombie"""
42+
"""Additional Message that can be used for logging to determine failure/task heartbeat timeout"""
4343

4444
@classmethod
4545
def from_json(cls, data: str | bytes | bytearray) -> Self:
@@ -54,7 +54,7 @@ class TaskCallbackRequest(BaseCallbackRequest):
5454
Task callback status information.
5555
5656
A Class with information about the success/failure TI callback to be executed. Currently, only failure
57-
callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
57+
callbacks when tasks are externally killed or experience heartbeat timeouts are run via DagFileProcessorProcess.
5858
"""
5959

6060
ti: ti_datamodel.TaskInstance

airflow/cli/commands/remote_commands/config_command.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,18 @@ def message(self) -> str:
419419
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
420420
renamed_to=ConfigParameter("dag_processor", "refresh_interval"),
421421
),
422+
ConfigChange(
423+
config=ConfigParameter("scheduler", "local_task_job_heartbeat_sec"),
424+
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_sec"),
425+
),
426+
ConfigChange(
427+
config=ConfigParameter("scheduler", "scheduler_zombie_task_threshold"),
428+
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout"),
429+
),
430+
ConfigChange(
431+
config=ConfigParameter("scheduler", "zombie_detection_interval"),
432+
renamed_to=ConfigParameter("scheduler", "task_instance_heartbeat_timeout_detection_interval"),
433+
),
422434
# celery
423435
ConfigChange(
424436
config=ConfigParameter("celery", "stalled_task_timeout"),

airflow/config_templates/config.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,11 +2193,11 @@ scheduler:
21932193
type: integer
21942194
example: ~
21952195
default: "5"
2196-
local_task_job_heartbeat_sec:
2196+
task_instance_heartbeat_sec:
21972197
description: |
21982198
The frequency (in seconds) at which the LocalTaskJob should send heartbeat signals to the
21992199
scheduler to notify it's still alive. If this value is set to 0, the heartbeat interval will default
2200-
to the value of ``[scheduler] scheduler_zombie_task_threshold``.
2200+
to the value of ``[scheduler] task_instance_heartbeat_timeout``.
22012201
version_added: 2.7.0
22022202
type: integer
22032203
example: ~
@@ -2283,7 +2283,7 @@ scheduler:
22832283
type: string
22842284
example: ~
22852285
default: "{AIRFLOW_HOME}/logs/scheduler"
2286-
scheduler_zombie_task_threshold:
2286+
task_instance_heartbeat_timeout:
22872287
description: |
22882288
Local task jobs periodically heartbeat to the DB. If the job has
22892289
not heartbeat in this many seconds, the scheduler will mark the
@@ -2292,9 +2292,9 @@ scheduler:
22922292
type: integer
22932293
example: ~
22942294
default: "300"
2295-
zombie_detection_interval:
2295+
task_instance_heartbeat_timeout_detection_interval:
22962296
description: |
2297-
How often (in seconds) should the scheduler check for zombie tasks.
2297+
How often (in seconds) should the scheduler check for task instances whose heartbeats have timed out.
22982298
version_added: 2.3.0
22992299
type: float
23002300
example: ~

airflow/jobs/scheduler_job_runner.py

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,10 @@ def __init__(
175175
# configure -- they'll want num_runs
176176
self.num_times_parse_dags = num_times_parse_dags
177177
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
178-
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
179-
self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
178+
# How many seconds do we wait for tasks to heartbeat before timeout.
179+
self._task_instance_heartbeat_timeout_secs = conf.getint(
180+
"scheduler", "task_instance_heartbeat_timeout"
181+
)
180182
self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
181183
self._task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
182184
self._enable_tracemalloc = conf.getboolean("scheduler", "enable_tracemalloc")
@@ -827,7 +829,7 @@ def process_executor_events(
827829
# or the TI is queued by another job. Either ways we should not fail it.
828830

829831
# All of this could also happen if the state is "running",
830-
# but that is handled by the zombie detection.
832+
# but that is handled by the scheduler detecting task instances without heartbeats.
831833

832834
ti_queued = ti.try_number == buffer_key.try_number and ti.state in (
833835
TaskInstanceState.SCHEDULED,
@@ -1016,8 +1018,8 @@ def _run_scheduler_loop(self) -> None:
10161018
)
10171019

10181020
timers.call_regular_interval(
1019-
conf.getfloat("scheduler", "zombie_detection_interval", fallback=10.0),
1020-
self._find_and_purge_zombies,
1021+
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
1022+
self._find_and_purge_task_instances_without_heartbeats,
10211023
)
10221024

10231025
timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
@@ -1950,26 +1952,30 @@ def check_trigger_timeouts(
19501952
if num_timed_out_tasks:
19511953
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
19521954

1953-
# [START find_and_purge_zombies]
1954-
def _find_and_purge_zombies(self) -> None:
1955+
# [START find_and_purge_task_instances_without_heartbeats]
1956+
def _find_and_purge_task_instances_without_heartbeats(self) -> None:
19551957
"""
1956-
Find and purge zombie task instances.
1958+
Find and purge task instances without heartbeats.
19571959
1958-
Zombie instances are tasks that failed to heartbeat for too long, or
1959-
have a no-longer-running LocalTaskJob.
1960+
Task instances that failed to heartbeat for too long, or
1961+
have a no-longer-running LocalTaskJob will be failed by the scheduler.
19601962
1961-
A TaskCallbackRequest is also created for the killed zombie to be
1963+
A TaskCallbackRequest is also created for the killed task instance to be
19621964
handled by the DAG processor, and the executor is informed to no longer
1963-
count the zombie as running when it calculates parallelism.
1965+
count the task instance as running when it calculates parallelism.
19641966
"""
19651967
with create_session() as session:
1966-
if zombies := self._find_zombies(session=session):
1967-
self._purge_zombies(zombies, session=session)
1968+
if task_instances_without_heartbeats := self._find_task_instances_without_heartbeats(
1969+
session=session
1970+
):
1971+
self._purge_task_instances_without_heartbeats(
1972+
task_instances_without_heartbeats, session=session
1973+
)
19681974

1969-
def _find_zombies(self, *, session: Session) -> list[TI]:
1975+
def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[TI]:
19701976
self.log.debug("Finding 'running' jobs without a recent heartbeat")
1971-
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
1972-
zombies = session.scalars(
1977+
limit_dttm = timezone.utcnow() - timedelta(seconds=self._task_instance_heartbeat_timeout_secs)
1978+
task_instances_without_heartbeats = session.scalars(
19731979
select(TI)
19741980
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
19751981
.join(DM, TI.dag_id == DM.dag_id)
@@ -1979,63 +1985,77 @@ def _find_zombies(self, *, session: Session) -> list[TI]:
19791985
)
19801986
.where(TI.queued_by_job_id == self.job.id)
19811987
).all()
1982-
if zombies:
1983-
self.log.warning("Failing %s TIs without heartbeat after %s", len(zombies), limit_dttm)
1984-
return zombies
1988+
if task_instances_without_heartbeats:
1989+
self.log.warning(
1990+
"Failing %s TIs without heartbeat after %s",
1991+
len(task_instances_without_heartbeats),
1992+
limit_dttm,
1993+
)
1994+
return task_instances_without_heartbeats
19851995

1986-
def _purge_zombies(self, zombies: list[TI], *, session: Session) -> None:
1987-
for ti in zombies:
1988-
zombie_message_details = self._generate_zombie_message_details(ti)
1996+
def _purge_task_instances_without_heartbeats(
1997+
self, task_instances_without_heartbeats: list[TI], *, session: Session
1998+
) -> None:
1999+
for ti in task_instances_without_heartbeats:
2000+
task_instance_heartbeat_timeout_message_details = (
2001+
self._generate_task_instance_heartbeat_timeout_message_details(ti)
2002+
)
19892003
request = TaskCallbackRequest(
19902004
filepath=ti.dag_model.relative_fileloc,
19912005
bundle_name=ti.dag_version.bundle_name,
19922006
bundle_version=ti.dag_run.bundle_version,
19932007
ti=ti,
1994-
msg=str(zombie_message_details),
2008+
msg=str(task_instance_heartbeat_timeout_message_details),
19952009
)
19962010
session.add(
19972011
Log(
19982012
event="heartbeat timeout",
19992013
task_instance=ti.key,
20002014
extra=(
2001-
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
2015+
f"Task did not emit heartbeat within time limit ({self._task_instance_heartbeat_timeout_secs} "
20022016
"seconds) and will be terminated. "
20032017
"See https://airflow.apache.org/docs/apache-airflow/"
2004-
"stable/core-concepts/tasks.html#zombie-tasks"
2018+
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout"
20052019
),
20062020
)
20072021
)
20082022
self.log.error(
2009-
"Detected zombie job: %s "
2023+
"Detected a task instance without a heartbeat: %s "
20102024
"(See https://airflow.apache.org/docs/apache-airflow/"
2011-
"stable/core-concepts/tasks.html#zombie-tasks)",
2025+
"stable/core-concepts/tasks.html#task-instance-heartbeat-timeout)",
20122026
request,
20132027
)
20142028
self.job.executor.send_callback(request)
20152029
if (executor := self._try_to_load_executor(ti.executor)) is None:
2016-
self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
2030+
self.log.warning(
2031+
"Cannot clean up task instance without heartbeat %r with non-existent executor %s",
2032+
ti,
2033+
ti.executor,
2034+
)
20172035
continue
20182036
executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
2019-
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
2037+
Stats.incr(
2038+
"task_instances_without_heartbeats_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id}
2039+
)
20202040

2021-
# [END find_and_purge_zombies]
2041+
# [END find_and_purge_task_instances_without_heartbeats]
20222042

20232043
@staticmethod
2024-
def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
2025-
zombie_message_details = {
2044+
def _generate_task_instance_heartbeat_timeout_message_details(ti: TI) -> dict[str, Any]:
2045+
task_instance_heartbeat_timeout_message_details = {
20262046
"DAG Id": ti.dag_id,
20272047
"Task Id": ti.task_id,
20282048
"Run Id": ti.run_id,
20292049
}
20302050

20312051
if ti.map_index != -1:
2032-
zombie_message_details["Map Index"] = ti.map_index
2052+
task_instance_heartbeat_timeout_message_details["Map Index"] = ti.map_index
20332053
if ti.hostname:
2034-
zombie_message_details["Hostname"] = ti.hostname
2054+
task_instance_heartbeat_timeout_message_details["Hostname"] = ti.hostname
20352055
if ti.external_executor_id:
2036-
zombie_message_details["External Executor Id"] = ti.external_executor_id
2056+
task_instance_heartbeat_timeout_message_details["External Executor Id"] = ti.external_executor_id
20372057

2038-
return zombie_message_details
2058+
return task_instance_heartbeat_timeout_message_details
20392059

20402060
@provide_session
20412061
def _update_asset_orphanage(self, session: Session = NEW_SESSION) -> None:

airflow/models/taskinstance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3110,7 +3110,7 @@ def fetch_handle_failure_context(
31103110

31113111
ti.clear_next_method_args()
31123112

3113-
# In extreme cases (zombie in case of dag with parse error) we might _not_ have a Task.
3113+
# In extreme cases (task instance heartbeat timeout in case of dag with parse error) we might _not_ have a Task.
31143114
if context is None and getattr(ti, "task", None):
31153115
context = ti.get_template_context(session)
31163116

docs/apache-airflow/administration-and-deployment/logging-monitoring/metrics.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ Name Descripti
157157
``ti_failures`` Overall task instances failures. Metric with dag_id and task_id tagging.
158158
``ti_successes`` Overall task instances successes. Metric with dag_id and task_id tagging.
159159
``previously_succeeded`` Number of previously succeeded task instances. Metric with dag_id and task_id tagging.
160-
``zombies_killed`` Zombie tasks killed. Metric with dag_id and task_id tagging.
160+
``task_instances_without_heartbeats_killed`` Task instances without heartbeats killed. Metric with dag_id and task_id tagging.
161161
``scheduler_heartbeat`` Scheduler heartbeats
162162
``dag_processor_heartbeat`` Standalone DAG processor heartbeats
163163
``dag_processing.processes`` Relative number of currently running DAG parsing processes (ie this delta

docs/apache-airflow/core-concepts/tasks.rst

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -165,35 +165,35 @@ If you want to control your task's state from within custom Task/Operator code,
165165

166166
These can be useful if your code has extra knowledge about its environment and wants to fail/skip faster - e.g., skipping when it knows there's no data available, or fast-failing when it detects its API key is invalid (as that will not be fixed by a retry).
167167

168-
.. _concepts:zombies:
168+
.. _concepts:task-instance-heartbeat-timeout:
169169

170-
Zombie Tasks
171-
------------
170+
Task Instance Heartbeat Timeout
171+
-------------------------------
172172

173173
No system runs perfectly, and task instances are expected to die once in a while.
174174

175-
*Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite their associated jobs being inactive
176-
(e.g. their process did not send a recent heartbeat as it got killed, or the machine died). Airflow will find these
177-
periodically, clean them up, and either fail or retry the task depending on its settings. Tasks can become zombies for
175+
``TaskInstances`` may get stuck in a ``running`` state despite their associated jobs being inactive
176+
(for example if the ``TaskInstance``'s worker ran out of memory). Such tasks were formerly known as zombie tasks. Airflow will find these
177+
periodically, clean them up, and mark the ``TaskInstance`` as failed or retry it if it has available retries. The ``TaskInstance``'s heartbeat can timeout for
178178
many reasons, including:
179179

180180
* The Airflow worker ran out of memory and was OOMKilled.
181181
* The Airflow worker failed its liveness probe, so the system (for example, Kubernetes) restarted the worker.
182182
* The system (for example, Kubernetes) scaled down and moved an Airflow worker from one node to another.
183183

184184

185-
Reproducing zombie tasks locally
186-
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
185+
Reproducing task instance heartbeat timeouts locally
186+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
187187

188-
If you'd like to reproduce zombie tasks for development/testing processes, follow the steps below:
188+
If you'd like to reproduce task instance heartbeat timeouts for development/testing processes, follow the steps below:
189189

190190
1. Set the below environment variables for your local Airflow setup (alternatively you could tweak the corresponding config values in airflow.cfg)
191191

192192
.. code-block:: bash
193193
194-
export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
195-
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
196-
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
194+
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
195+
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT=2
196+
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5
197197
198198
199199
2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a long-running task). For example, you could use the below DAG:
@@ -216,7 +216,7 @@ If you'd like to reproduce zombie tasks for development/testing processes, follo
216216
sleep_dag()
217217
218218
219-
Run the above DAG and wait for a while. You should see the task instance becoming a zombie task and then being killed by the scheduler.
219+
Run the above DAG and wait for a while. The ``TaskInstance`` will be marked failed after <task_instance_heartbeat_timeout> seconds.
220220

221221

222222

docs/apache-airflow/faq.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ Why did my task fail with no logs in the UI?
453453
Logs are :ref:`typically served when a task reaches a terminal state <serving-worker-trigger-logs>`. Sometimes, a task's normal lifecycle is disrupted, and the task's
454454
worker is unable to write the task's logs. This typically happens for one of two reasons:
455455

456-
1. :ref:`Zombie tasks <concepts:zombies>`.
456+
1. :ref:`Task Instance Heartbeat Timeout <concepts:task-instance-heartbeat-timeout>`.
457457
2. Tasks failed after getting stuck in queued (Airflow 2.6.0+). Tasks that are in queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>` will be marked as failed, and there will be no task logs in the Airflow UI.
458458

459459
Setting retries for each task drastically reduces the chance that either of these problems impact a workflow.

docs/apache-airflow/static/redirects.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
document.addEventListener("DOMContentLoaded", function () {
2121
const redirects = {
22-
"zombie-undead-tasks": "zombie-tasks",
22+
"zombie-undead-tasks": "task-instance-heartbeat-timeout",
23+
"zombie-tasks": "task-instance-heartbeat-timeout",
2324
};
2425
const fragment = window.location.hash.substring(1);
2526
if (redirects[fragment]) {

docs/apache-airflow/troubleshooting.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Below are some example scenarios that could cause a task's state to change by a
3232

3333
- If a task's DAG failed to parse on the worker, the scheduler may mark the task as failed. If confirmed, consider increasing :ref:`core.dagbag_import_timeout <config:core__dagbag_import_timeout>` and :ref:`dag_processor.dag_file_processor_timeout <config:dag_processor__dag_file_processor_timeout>`.
3434
- The scheduler will mark a task as failed if the task has been queued for longer than :ref:`scheduler.task_queued_timeout <config:scheduler__task_queued_timeout>`.
35-
- If a task becomes a :ref:`zombie <concepts:zombies>`, it will be marked failed by the scheduler.
35+
- If a :ref:`task instance's heartbeat times out <concepts:task-instance-heartbeat-timeout>`, it will be marked failed by the scheduler.
3636
- A user marked the task as successful or failed in the Airflow UI.
3737
- An external script or process used the :doc:`Airflow REST API <stable-rest-api-ref>` to change the state of a task.
3838

@@ -45,4 +45,4 @@ Here are some examples that could cause such an event:
4545

4646
- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
4747
- An Airflow worker running out of memory
48-
- Usually, Airflow workers that run out of memory receive a SIGKILL and are marked as a zombie and failed by the scheduler. However, in some scenarios, Airflow kills the task before that happens.
48+
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens.

0 commit comments

Comments
 (0)