Skip to content

Commit 58c6a49

Browse files
authored
Remove the old task run commands and LocalTaskJob (apache#47453)
1 parent 073353b commit 58c6a49

File tree

12 files changed

+17
-3178
lines changed

12 files changed

+17
-3178
lines changed

.github/boring-cyborg.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ labelPRBasedOnFilePath:
357357

358358
area:Scheduler:
359359
- airflow/jobs/scheduler_job_runner.py
360-
- airflow/task/standard_task_runner.py
361360
- docs/apache-airflow/administration-and-deployment/scheduler.rst
362361
- tests/jobs/test_scheduler_job.py
363362

airflow/cli/cli_config.py

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -544,51 +544,8 @@ def string_lower_type(val):
544544
ARG_KERBEROS_ONE_TIME_MODE = Arg(
545545
("-o", "--one-time"), help="Run airflow kerberos one time instead of forever", action="store_true"
546546
)
547-
# run
548-
ARG_INTERACTIVE = Arg(
549-
("-N", "--interactive"),
550-
help="Do not capture standard output and error streams (useful for interactive debugging)",
551-
action="store_true",
552-
)
553-
# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
554-
# all dependencies (not just past success), e.g. the ignore_depends_on_past
555-
# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
556-
# the "ignore_all_dependencies" command should be called the"force" command
557-
# instead.
558-
ARG_FORCE = Arg(
559-
("-f", "--force"),
560-
help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
561-
action="store_true",
562-
)
563-
ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
564-
ARG_IGNORE_ALL_DEPENDENCIES = Arg(
565-
("-A", "--ignore-all-dependencies"),
566-
help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
567-
action="store_true",
568-
)
569-
# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
570-
# vague (e.g. a task being in the appropriate state to be run is also a dependency
571-
# but is not ignored by this flag), the name 'ignore_task_dependencies' is
572-
# slightly better (as it ignores all dependencies that are specific to the task),
573-
# so deprecate the old command name and use this instead.
574-
ARG_IGNORE_DEPENDENCIES = Arg(
575-
("-i", "--ignore-dependencies"),
576-
help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
577-
action="store_true",
578-
)
579-
ARG_DEPENDS_ON_PAST = Arg(
580-
("-d", "--depends-on-past"),
581-
help="Determine how Airflow should deal with past dependencies. The default action is `check`, Airflow "
582-
"will check if the past dependencies are met for the tasks having `depends_on_past=True` before run "
583-
"them, if `ignore` is provided, the past dependencies will be ignored, if `wait` is provided and "
584-
"`depends_on_past=True`, Airflow will wait the past dependencies until they are met before running or "
585-
"skipping the task",
586-
choices={"check", "ignore", "wait"},
587-
default="check",
588-
)
589-
ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
547+
# tasks
590548
ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
591-
ARG_READ_FROM_DB = Arg(("--read-from-db",), help="Read dag from DB instead of dag file", action="store_true")
592549

593550

594551
# database
@@ -838,7 +795,7 @@ def string_lower_type(val):
838795
# jobs check
839796
ARG_JOB_TYPE_FILTER = Arg(
840797
("--job-type",),
841-
choices=("LocalTaskJob", "SchedulerJob", "TriggererJob", "DagProcessorJob"),
798+
choices=("SchedulerJob", "TriggererJob", "DagProcessorJob"),
842799
action="store",
843800
help="The type of job(s) that will be checked.",
844801
)
@@ -1253,31 +1210,6 @@ class GroupCommand(NamedTuple):
12531210
ARG_MAP_INDEX,
12541211
),
12551212
),
1256-
ActionCommand(
1257-
name="run",
1258-
help="Run a single task instance",
1259-
func=lazy_load_command("airflow.cli.commands.remote_commands.task_command.task_run"),
1260-
args=(
1261-
ARG_DAG_ID,
1262-
ARG_TASK_ID,
1263-
ARG_LOGICAL_DATE_OR_RUN_ID,
1264-
ARG_SUBDIR,
1265-
ARG_MARK_SUCCESS,
1266-
ARG_FORCE,
1267-
ARG_POOL,
1268-
ARG_CFG_PATH,
1269-
ARG_LOCAL,
1270-
ARG_RAW,
1271-
ARG_IGNORE_ALL_DEPENDENCIES,
1272-
ARG_IGNORE_DEPENDENCIES,
1273-
ARG_DEPENDS_ON_PAST,
1274-
ARG_INTERACTIVE,
1275-
ARG_SHUT_DOWN_LOGGING,
1276-
ARG_MAP_INDEX,
1277-
ARG_VERBOSE,
1278-
ARG_READ_FROM_DB,
1279-
),
1280-
),
12811213
ActionCommand(
12821214
name="test",
12831215
help="Test a task instance",

airflow/cli/commands/remote_commands/task_command.py

Lines changed: 3 additions & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -24,42 +24,28 @@
2424
import json
2525
import logging
2626
import os
27-
import sys
2827
import textwrap
29-
from collections.abc import Generator
30-
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
31-
from pathlib import Path
28+
from contextlib import redirect_stdout
3229
from typing import TYPE_CHECKING, Protocol, cast
3330

3431
from airflow import settings
3532
from airflow.cli.simple_table import AirflowConsole
3633
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
37-
from airflow.configuration import conf
38-
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound
39-
from airflow.executors.executor_loader import ExecutorLoader
40-
from airflow.jobs.job import Job, run_job
41-
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
42-
from airflow.listeners.listener import get_listener_manager
34+
from airflow.exceptions import DagRunNotFound, TaskDeferred, TaskInstanceNotFound
4335
from airflow.models import TaskInstance
4436
from airflow.models.dag import DAG, _run_inline_trigger
4537
from airflow.models.dagrun import DagRun
46-
from airflow.models.taskinstance import TaskReturnCode
4738
from airflow.sdk.definitions.param import ParamsDict
4839
from airflow.sdk.execution_time.secrets_masker import RedactedIO
49-
from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
5040
from airflow.ti_deps.dep_context import DepContext
5141
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
5242
from airflow.utils import cli as cli_utils, timezone
5343
from airflow.utils.cli import (
5444
get_dag,
5545
get_dag_by_file_location,
5646
get_dags,
57-
should_ignore_depends_on_past,
5847
suppress_logs_and_warning,
5948
)
60-
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
61-
from airflow.utils.log.logging_mixin import StreamLogWriter
62-
from airflow.utils.net import get_hostname
6349
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
6450
from airflow.utils.session import NEW_SESSION, create_session, provide_session
6551
from airflow.utils.state import DagRunState
@@ -123,8 +109,7 @@ def _get_dag_run(
123109
return dag_run, False
124110
elif not create_if_necessary:
125111
raise DagRunNotFound(
126-
f"DagRun for {dag.dag_id} with run_id or logical_date "
127-
f"of {logical_date_or_run_id!r} not found"
112+
f"DagRun for {dag.dag_id} with run_id or logical_date of {logical_date_or_run_id!r} not found"
128113
)
129114

130115
dag_run_logical_date = timezone.coerce_datetime(logical_date)
@@ -227,237 +212,10 @@ def _get_ti(
227212
return ti, dr_created
228213

229214

230-
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
231-
"""
232-
Run the task based on a mode.
233-
234-
Any of the 3 modes are available:
235-
236-
- using LocalTaskJob
237-
- as raw task
238-
- by executor
239-
"""
240-
if args.local:
241-
return _run_task_by_local_task_job(args, ti)
242-
if args.raw:
243-
return _run_raw_task(args, ti)
244-
_run_task_by_executor(args, dag, ti)
245-
return None
246-
247-
248-
def _run_task_by_executor(args, dag: DAG, ti: TaskInstance) -> None:
249-
"""
250-
Send the task to the executor for execution.
251-
252-
This can result in the task being started by another host if the executor implementation does.
253-
"""
254-
from airflow.executors.base_executor import BaseExecutor
255-
256-
if ti.executor:
257-
executor = ExecutorLoader.load_executor(ti.executor)
258-
else:
259-
executor = ExecutorLoader.get_default_executor()
260-
executor.job_id = None
261-
executor.start()
262-
print("Sending to executor.")
263-
264-
# TODO: Task-SDK: this is temporary while we migrate the other executors over
265-
if executor.queue_workload.__func__ is not BaseExecutor.queue_workload: # type: ignore[attr-defined]
266-
from airflow.executors import workloads
267-
268-
if TYPE_CHECKING:
269-
assert dag.relative_fileloc
270-
workload = workloads.ExecuteTask.make(ti, dag_rel_path=Path(dag.relative_fileloc))
271-
with create_session() as session:
272-
executor.queue_workload(workload, session)
273-
else:
274-
executor.queue_task_instance(
275-
ti,
276-
mark_success=args.mark_success,
277-
ignore_all_deps=args.ignore_all_dependencies,
278-
ignore_depends_on_past=should_ignore_depends_on_past(args),
279-
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
280-
ignore_task_deps=args.ignore_dependencies,
281-
ignore_ti_state=args.force,
282-
pool=args.pool,
283-
)
284-
executor.heartbeat()
285-
executor.end()
286-
287-
288-
def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None:
289-
"""Run LocalTaskJob, which monitors the raw task execution process."""
290-
job_runner = LocalTaskJobRunner(
291-
job=Job(dag_id=ti.dag_id),
292-
task_instance=ti,
293-
mark_success=args.mark_success,
294-
ignore_all_deps=args.ignore_all_dependencies,
295-
ignore_depends_on_past=should_ignore_depends_on_past(args),
296-
wait_for_past_depends_before_skipping=(args.depends_on_past == "wait"),
297-
ignore_task_deps=args.ignore_dependencies,
298-
ignore_ti_state=args.force,
299-
pool=args.pool,
300-
external_executor_id=_extract_external_executor_id(args),
301-
)
302-
try:
303-
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
304-
finally:
305-
if args.shut_down_logging:
306-
logging.shutdown()
307-
with suppress(ValueError):
308-
return TaskReturnCode(ret)
309-
return None
310-
311-
312-
RAW_TASK_UNSUPPORTED_OPTION = [
313-
"ignore_all_dependencies",
314-
"ignore_dependencies",
315-
"force",
316-
]
317-
318-
319-
def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
320-
"""Run the main task handling code."""
321-
return ti._run_raw_task(
322-
mark_success=args.mark_success,
323-
pool=args.pool,
324-
)
325-
326-
327-
def _extract_external_executor_id(args) -> str | None:
328-
if hasattr(args, "external_executor_id"):
329-
return getattr(args, "external_executor_id")
330-
return os.environ.get("external_executor_id", None)
331-
332-
333-
@contextmanager
334-
def _move_task_handlers_to_root(ti: TaskInstance) -> Generator[None, None, None]:
335-
"""
336-
Move handlers for task logging to root logger.
337-
338-
We want anything logged during task run to be propagated to task log handlers.
339-
If running in a k8s executor pod, also keep the stream handler on root logger
340-
so that logs are still emitted to stdout.
341-
"""
342-
# nothing to do
343-
if not ti.log.handlers or settings.DONOT_MODIFY_HANDLERS:
344-
yield
345-
return
346-
347-
# Move task handlers to root and reset task logger and restore original logger settings after exit.
348-
# If k8s executor, we need to ensure that root logger has a console handler, so that
349-
# task logs propagate to stdout (this is how webserver retrieves them while task is running).
350-
root_logger = logging.getLogger()
351-
console_handler = next((h for h in root_logger.handlers if h.name == "console"), None)
352-
with LoggerMutationHelper(root_logger), LoggerMutationHelper(ti.log) as task_helper:
353-
task_helper.move(root_logger)
354-
if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
355-
if console_handler and console_handler not in root_logger.handlers:
356-
root_logger.addHandler(console_handler)
357-
yield
358-
359-
360-
@contextmanager
361-
def _redirect_stdout_to_ti_log(ti: TaskInstance) -> Generator[None, None, None]:
362-
"""
363-
Redirect stdout to ti logger.
364-
365-
Redirect stdout and stderr to the task instance log as INFO and WARNING
366-
level messages, respectively.
367-
368-
If stdout already redirected (possible when task running with option
369-
`--local`), don't redirect again.
370-
"""
371-
# if sys.stdout is StreamLogWriter, it means we already redirected
372-
# likely before forking in LocalTaskJob
373-
if not isinstance(sys.stdout, StreamLogWriter):
374-
info_writer = StreamLogWriter(ti.log, logging.INFO)
375-
warning_writer = StreamLogWriter(ti.log, logging.WARNING)
376-
with redirect_stdout(info_writer), redirect_stderr(warning_writer):
377-
yield
378-
else:
379-
yield
380-
381-
382215
class TaskCommandMarker:
383216
"""Marker for listener hooks, to properly detect from which component they are called."""
384217

385218

386-
@cli_utils.action_cli(check_db=False)
387-
def task_run(args, dag: DAG | None = None) -> TaskReturnCode | None:
388-
"""
389-
Run a single task instance.
390-
391-
Note that there must be at least one DagRun for this to start,
392-
i.e. it must have been scheduled and/or triggered previously.
393-
Alternatively, if you just need to run it for testing then use
394-
"airflow tasks test ..." command instead.
395-
"""
396-
# Load custom airflow config
397-
398-
if args.local and args.raw:
399-
raise AirflowException(
400-
"Option --raw and --local are mutually exclusive. "
401-
"Please remove one option to execute the command."
402-
)
403-
404-
if args.raw:
405-
unsupported_options = [o for o in RAW_TASK_UNSUPPORTED_OPTION if getattr(args, o)]
406-
407-
if unsupported_options:
408-
unsupported_raw_task_flags = ", ".join(f"--{o}" for o in RAW_TASK_UNSUPPORTED_OPTION)
409-
unsupported_flags = ", ".join(f"--{o}" for o in unsupported_options)
410-
raise AirflowException(
411-
"Option --raw does not work with some of the other options on this command. "
412-
"You can't use --raw option and the following options: "
413-
f"{unsupported_raw_task_flags}. "
414-
f"You provided the option {unsupported_flags}. "
415-
"Delete it to execute the command."
416-
)
417-
418-
if args.cfg_path:
419-
with open(args.cfg_path) as conf_file:
420-
conf_dict = json.load(conf_file)
421-
422-
if os.path.exists(args.cfg_path):
423-
os.remove(args.cfg_path)
424-
425-
conf.read_dict(conf_dict, source=args.cfg_path)
426-
settings.configure_vars()
427-
428-
settings.MASK_SECRETS_IN_LOGS = True
429-
430-
get_listener_manager().hook.on_starting(component=TaskCommandMarker())
431-
432-
if not dag:
433-
_dag = get_dag(args.subdir, args.dag_id, args.read_from_db)
434-
else:
435-
_dag = dag
436-
task = _dag.get_task(task_id=args.task_id)
437-
ti, _ = _get_ti(task, args.map_index, logical_date_or_run_id=args.logical_date_or_run_id, pool=args.pool)
438-
ti.init_run_context(raw=args.raw)
439-
440-
hostname = get_hostname()
441-
442-
log.info("Running %s on host %s", ti, hostname)
443-
444-
task_return_code = None
445-
try:
446-
if args.interactive:
447-
task_return_code = _run_task_by_selected_method(args, _dag, ti)
448-
else:
449-
with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
450-
task_return_code = _run_task_by_selected_method(args, _dag, ti)
451-
if task_return_code == TaskReturnCode.DEFERRED:
452-
_set_task_deferred_context_var()
453-
finally:
454-
try:
455-
get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
456-
except Exception:
457-
pass
458-
return task_return_code
459-
460-
461219
@cli_utils.action_cli(check_db=False)
462220
@providers_configuration_loaded
463221
def task_failed_deps(args) -> None:

0 commit comments

Comments
 (0)