Skip to content

Commit 7190cb2

Browse files
Use LoggingMixin for DagProcessorManager (apache#46408)
Since we always have a standalone DAG processor now, we no longer need the special logging config to have a separate log file for the DagProcessorManager. Using LoggingMixin instead, all of the logs end up on stdout, just like other Airflow components.
1 parent ee76b4d commit 7190cb2

File tree

5 files changed

+56
-110
lines changed

5 files changed

+56
-110
lines changed

airflow/cli/commands/remote_commands/config_command.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,27 +84,30 @@ class ConfigChange:
8484
:param config: The configuration parameter being changed.
8585
:param suggestion: A suggestion for replacing or handling the removed configuration.
8686
:param renamed_to: The new section and option if the configuration is renamed.
87+
:param was_deprecated: If the config is removed, whether the old config was deprecated.
8788
"""
8889

8990
config: ConfigParameter
9091
suggestion: str = ""
9192
renamed_to: ConfigParameter | None = None
93+
was_deprecated: bool = True
9294

9395
@property
9496
def message(self) -> str:
9597
"""Generate a message for this configuration change."""
9698
if self.renamed_to:
9799
if self.config.section != self.renamed_to.section:
98100
return (
99-
f"`{self.config.option}` configuration parameter moved from `{self.config.section}` section to `"
100-
f"{self.renamed_to.section}` section as `{self.renamed_to.option}`."
101+
f"`{self.config.option}` configuration parameter moved from `{self.config.section}` section to "
102+
f"`{self.renamed_to.section}` section as `{self.renamed_to.option}`."
101103
)
102104
return (
103105
f"`{self.config.option}` configuration parameter renamed to `{self.renamed_to.option}` "
104106
f"in the `{self.config.section}` section."
105107
)
106108
return (
107-
f"Removed deprecated `{self.config.option}` configuration parameter from `{self.config.section}` section. "
109+
f"Removed{' deprecated' if self.was_deprecated else ''} `{self.config.option}` configuration parameter "
110+
f"from `{self.config.section}` section. "
108111
f"{self.suggestion}"
109112
)
110113

@@ -203,6 +206,12 @@ def message(self) -> str:
203206
config=ConfigParameter("core", "dag_file_processor_timeout"),
204207
renamed_to=ConfigParameter("dag_processor", "dag_file_processor_timeout"),
205208
),
209+
ConfigChange(
210+
config=ConfigParameter("core", "dag_processor_manager_log_location"),
211+
),
212+
ConfigChange(
213+
config=ConfigParameter("core", "log_processor_filename_template"),
214+
),
206215
# api
207216
ConfigChange(
208217
config=ConfigParameter("api", "access_control_allow_origin"),
@@ -218,6 +227,18 @@ def message(self) -> str:
218227
suggestion="Remove TaskContextLogger: Replaced by the Log table for better handling of task log "
219228
"messages outside the execution context.",
220229
),
230+
ConfigChange(
231+
config=ConfigParameter("logging", "dag_processor_manager_log_location"),
232+
was_deprecated=False,
233+
),
234+
ConfigChange(
235+
config=ConfigParameter("logging", "dag_processor_manager_log_stdout"),
236+
was_deprecated=False,
237+
),
238+
ConfigChange(
239+
config=ConfigParameter("logging", "log_processor_filename_template"),
240+
was_deprecated=False,
241+
),
221242
# metrics
222243
ConfigChange(
223244
config=ConfigParameter("metrics", "metrics_use_pattern_match"),

airflow/config_templates/airflow_local_settings.py

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from __future__ import annotations
2121

2222
import os
23-
from pathlib import Path
2423
from typing import Any
2524
from urllib.parse import urlsplit
2625

@@ -53,17 +52,6 @@
5352

5453
PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
5554

56-
DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
57-
"logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
58-
)
59-
60-
DAG_PROCESSOR_MANAGER_LOG_STDOUT: str = conf.get_mandatory_value(
61-
"logging", "DAG_PROCESSOR_MANAGER_LOG_STDOUT"
62-
)
63-
64-
65-
PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
66-
6755
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
6856
"version": 1,
6957
"disable_existing_loggers": False,
@@ -99,27 +87,8 @@
9987
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
10088
"filters": ["mask_secrets"],
10189
},
102-
"processor": {
103-
"class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
104-
"formatter": "airflow",
105-
"base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
106-
"filename_template": PROCESSOR_FILENAME_TEMPLATE,
107-
"filters": ["mask_secrets"],
108-
},
109-
"processor_to_stdout": {
110-
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
111-
"formatter": "source_processor",
112-
"stream": "sys.stdout",
113-
"filters": ["mask_secrets"],
114-
},
11590
},
11691
"loggers": {
117-
"airflow.processor": {
118-
"handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
119-
"level": LOG_LEVEL,
120-
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
121-
"propagate": True,
122-
},
12392
"airflow.task": {
12493
"handlers": ["task"],
12594
"level": LOG_LEVEL,
@@ -152,54 +121,6 @@
152121
}
153122
DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
154123

155-
DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
156-
"handlers": {
157-
"processor_manager": {
158-
"class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
159-
"formatter": "airflow",
160-
"filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
161-
"mode": "a",
162-
"maxBytes": 104857600, # 100MB
163-
"backupCount": 5,
164-
}
165-
},
166-
"loggers": {
167-
"airflow.processor_manager": {
168-
"handlers": ["processor_manager"],
169-
"level": LOG_LEVEL,
170-
"propagate": False,
171-
}
172-
},
173-
}
174-
175-
if DAG_PROCESSOR_MANAGER_LOG_STDOUT == "True":
176-
DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"].update(
177-
{
178-
"console": {
179-
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
180-
"formatter": "airflow",
181-
"stream": "sys.stdout",
182-
"filters": ["mask_secrets"],
183-
}
184-
}
185-
)
186-
DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"]["airflow.processor_manager"]["handlers"].append("console")
187-
188-
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
189-
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
190-
# in multiple processes.
191-
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
192-
DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
193-
DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
194-
195-
# Manually create log directory for processor_manager handler as RotatingFileHandler
196-
# will only create file but not the directory.
197-
processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
198-
"processor_manager"
199-
]
200-
directory: str = os.path.dirname(processor_manager_handler_config["filename"])
201-
Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
202-
203124
##################
204125
# Remote logging #
205126
##################

airflow/config_templates/config.yml

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -912,28 +912,6 @@ logging:
912912
default: "dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/\
913913
{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}\
914914
attempt={{ try_number|default(ti.try_number) }}.log"
915-
log_processor_filename_template:
916-
description: |
917-
Formatting for how airflow generates file names for log
918-
version_added: 2.0.0
919-
type: string
920-
example: ~
921-
is_template: true
922-
default: "{{ filename }}.log"
923-
dag_processor_manager_log_location:
924-
description: |
925-
Full path of dag_processor_manager logfile.
926-
version_added: 2.0.0
927-
type: string
928-
example: ~
929-
default: "{AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log"
930-
dag_processor_manager_log_stdout:
931-
description: |
932-
Whether DAG processor manager will write logs to stdout
933-
version_added: 2.9.0
934-
type: boolean
935-
example: ~
936-
default: "False"
937915
task_log_reader:
938916
description: |
939917
Name of handler to read task instance logs.

airflow/dag_processing/manager.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
from airflow.traces.tracer import Trace
6464
from airflow.utils import timezone
6565
from airflow.utils.file import list_py_file_paths, might_contain_dag
66+
from airflow.utils.log.logging_mixin import LoggingMixin
6667
from airflow.utils.net import get_hostname
6768
from airflow.utils.process_utils import (
6869
kill_child_processes_by_pids,
@@ -97,9 +98,6 @@ class DagFileStat:
9798
last_num_of_db_queries: int = 0
9899

99100

100-
log = logging.getLogger("airflow.processor_manager")
101-
102-
103101
@dataclass(frozen=True)
104102
class DagFileInfo:
105103
"""Information about a DAG file."""
@@ -135,7 +133,7 @@ def _resolve_path(instance: Any, attribute: attrs.Attribute, val: str | os.PathL
135133

136134

137135
@attrs.define
138-
class DagFileProcessorManager:
136+
class DagFileProcessorManager(LoggingMixin):
139137
"""
140138
Manage processes responsible for parsing DAGs.
141139
@@ -167,8 +165,6 @@ class DagFileProcessorManager:
167165
factory=_config_int_factory("dag_processor", "stale_dag_threshold")
168166
)
169167

170-
log: logging.Logger = attrs.field(default=log, init=False)
171-
172168
_last_deactivate_stale_dags_time: float = attrs.field(default=0, init=False)
173169
print_stats_interval: float = attrs.field(
174170
factory=_config_int_factory("dag_processor", "print_stats_interval")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
DAG processor related config options removed
2+
3+
The follow configuration options have been removed:
4+
5+
- ``[logging] dag_processor_manager_log_location``
6+
- ``[logging] dag_processor_manager_log_stdout``
7+
- ``[logging] log_processor_filename_template``
8+
9+
If these config options are still present, they will have no effect any longer.
10+
11+
* Types of change
12+
13+
* [ ] Dag changes
14+
* [x] Config changes
15+
* [ ] API changes
16+
* [ ] CLI changes
17+
* [ ] Behaviour changes
18+
* [ ] Plugin changes
19+
* [ ] Dependency changes
20+
* [ ] Code interface changes
21+
22+
.. List the migration rules needed for this change (see https://github.com/apache/airflow/issues/41641)
23+
24+
* Migration rules needed
25+
26+
* ``airflow config lint``
27+
28+
* Remove ``[logging] dag_processor_manager_log_location``
29+
* Remove ``[logging] dag_processor_manager_log_stdout``
30+
* Remove ``[logging] log_processor_filename_template``

0 commit comments

Comments
 (0)