Skip to content

Commit f8c6e81

Browse files
authored
Add decorator to optionally enable Memray-based memory tracing (apache#56821)
1 parent 5a5dc4f commit f8c6e81

File tree

15 files changed

+638
-2
lines changed

15 files changed

+638
-2
lines changed

airflow-core/docs/extra-packages-ref.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ python dependencies for the provided package. The same extras are available as `
9292
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
9393
| kerberos | ``pip install 'apache-airflow[kerberos]'`` | Kerberos integration for Kerberized services (Hadoop, Presto, Trino) |
9494
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
95+
| memray | ``pip install 'apache-airflow[memray]'`` | Required for memory profiling with memray |
96+
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
9597
| otel | ``pip install 'apache-airflow[otel]'`` | Required for OpenTelemetry metrics |
9698
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+
9799
| sentry | ``pip install 'apache-airflow[sentry]'`` | Sentry service for application logging and monitoring |

airflow-core/docs/howto/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,4 @@ configuring an Airflow environment.
5656
dynamic-dag-generation
5757
docker-compose/index
5858
run-with-self-signed-certificate
59+
memory-profiling
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
.. _memory-profiling:
19+
20+
Memory Profiling with Memray
21+
=============================
22+
23+
Airflow integrates `Memray <https://bloomberg.github.io/memray/>`__, a memory profiler for Python,
24+
to help you diagnose memory usage patterns and identify potential memory leaks in Airflow components.
25+
This guide will walk you through how to profile memory usage in key Airflow components such as the
26+
scheduler, API server, and DAG processor.
27+
28+
.. note::
29+
30+
Memory profiling is an expensive operation and should generally only be used for debugging purposes
31+
in development. It is not recommended for production use.
32+
See :ref:`memory-profiling-precautions` for important precautions.
33+
34+
Prerequisites
35+
-------------
36+
37+
Before you can use memory profiling, you need to install Airflow with the ``memray`` extra:
38+
39+
.. code-block:: bash
40+
41+
pip install 'apache-airflow[memray]'
42+
43+
Alternatively, if you have an existing Airflow installation:
44+
45+
.. code-block:: bash
46+
47+
pip install 'memray>=1.19.0'
48+
49+
.. note::
50+
51+
For more information about supported environments, see the
52+
`Memray supported environments documentation <https://bloomberg.github.io/memray/supported_environments.html>`__.
53+
54+
55+
Configuring Memory Profiling
56+
-----------------------------
57+
58+
Memory profiling is controlled through Airflow's configuration. You can enable it for specific
59+
components by setting the ``memray_trace_components`` option in the ``[profiling]`` section of your
60+
``airflow.cfg`` file or through environment variables.
61+
62+
Configuration Options
63+
^^^^^^^^^^^^^^^^^^^^^
64+
65+
Add the following to your ``airflow.cfg`` file:
66+
67+
.. code-block:: ini
68+
69+
[profiling]
70+
# Comma-separated list of Airflow components to profile with memray
71+
# Valid components: scheduler, api, dag_processor
72+
# Invalid component names will be ignored
73+
memray_trace_components = scheduler,dag_processor,api
74+
75+
Or set it via environment variable:
76+
77+
.. code-block:: bash
78+
79+
export AIRFLOW__PROFILING__MEMRAY_TRACE_COMPONENTS="scheduler,dag_processor,api"
80+
81+
.. note::
82+
83+
To disable memory profiling after you've completed your analysis, simply set
84+
``memray_trace_components`` to an empty string (or unset the environment variable)
85+
and restart the affected components.
86+
87+
Step-by-Step Profiling Guide
88+
-----------------------------
89+
90+
This section provides a practical walkthrough of how to profile memory usage in your Airflow deployment.
91+
92+
Step 1: Enable Memory Profiling
93+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
94+
95+
First, decide which component you want to profile. For this example, let's profile the scheduler.
96+
97+
Edit your ``airflow.cfg``:
98+
99+
.. code-block:: ini
100+
101+
[profiling]
102+
memray_trace_components = scheduler
103+
104+
Or set the environment variable:
105+
106+
.. code-block:: bash
107+
108+
export AIRFLOW__PROFILING__MEMRAY_TRACE_COMPONENTS=scheduler
109+
110+
Step 2: Restart the Component
111+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
112+
113+
After enabling profiling for a component, you need to restart it for the changes to take effect.
114+
115+
.. code-block:: bash
116+
117+
# If running standalone
118+
airflow scheduler
119+
120+
# If running with systemd
121+
sudo systemctl restart airflow-scheduler
122+
123+
# If running with Docker Compose
124+
docker-compose restart airflow-scheduler
125+
126+
Step 3: Run Your Workload
127+
^^^^^^^^^^^^^^^^^^^^^^^^^^
128+
129+
Let Airflow run normally and perform the operations you want to profile. For example:
130+
131+
- Let the scheduler run for a period of time
132+
- Process specific DAG files that may be problematic
133+
134+
The longer you let it run, the more data you'll collect. However, keep in mind that memory
135+
profiling adds overhead, so a few minutes to an hour is usually sufficient for diagnosis.
136+
137+
Step 4: Retrieve the Profile File
138+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
139+
140+
Memray will automatically generate a binary profile file in your ``$AIRFLOW_HOME`` directory.
141+
The filename follows the pattern ``<component>_memory.bin``:
142+
143+
.. code-block:: bash
144+
145+
# Default locations
146+
$AIRFLOW_HOME/scheduler_memory.bin
147+
$AIRFLOW_HOME/api_memory.bin
148+
$AIRFLOW_HOME/dag_processor_memory.bin
149+
150+
If running in a containerized environment, you may need to copy the file from the container:
151+
152+
.. code-block:: bash
153+
154+
# Docker
155+
docker cp <container_name>:/path/to/airflow/home/scheduler_memory.bin .
156+
157+
# Kubernetes
158+
kubectl cp <namespace>/<pod_name>:/path/to/airflow/home/scheduler_memory.bin ./scheduler_memory.bin
159+
160+
Step 5: Analyze the Profile
161+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
162+
163+
Once you have the profile file, use Memray's analysis tools to visualize and understand the memory usage.
164+
165+
Generate a Flamegraph
166+
"""""""""""""""""""""
167+
168+
The flamegraph is the most common way to visualize memory allocations:
169+
170+
.. code-block:: bash
171+
172+
memray flamegraph scheduler_memory.bin
173+
174+
This will generate an HTML file (``memray-flamegraph-scheduler_memory.html``) that you can open in a web browser.
175+
176+
.. image:: ../img/memray-flamegraph.png
177+
:alt: Example Memray flamegraph showing memory allocations
178+
179+
The flamegraph shows the call stack with the width of each box representing the amount of memory allocated
180+
by that function. Functions at the top of the graph are leaf functions that directly allocate memory.
181+
182+
Other Analysis Methods
183+
""""""""""""""""""""""
184+
185+
Memray provides several other ways to analyze memory profiles, including table reports, summary statistics,
186+
live monitoring, and more. For detailed information on all available analysis commands and options,
187+
refer to the `Memray documentation on analyzing results <https://bloomberg.github.io/memray/run.html>`__.
188+
189+
Interpreting Results
190+
--------------------
191+
192+
When analyzing your memory profile, look for:
193+
194+
**High Memory Allocation Functions**
195+
Functions that allocate large amounts of memory or are called frequently. These are the widest
196+
bars in the flamegraph or top entries in the table report.
197+
198+
**Memory Retention Patterns**
199+
If you see certain functions consistently holding memory over time, this could indicate a memory leak.
200+
201+
**Unexpected Allocations**
202+
Look for memory allocations in places you wouldn't expect, which might indicate inefficient code
203+
or unnecessary data structures.
204+
205+
**Third-Party Library Usage**
206+
Sometimes memory issues are caused by how third-party libraries are used. The flamegraph will
207+
show you if a particular library is responsible for high memory usage.
208+
209+
.. _memory-profiling-precautions:
210+
211+
Precautions
212+
--------------
213+
214+
1. **Profile in Non-Production Environments**
215+
Memory profiling adds significant overhead, including increased memory usage and performance
216+
degradation. Use it in development that mirror your production setup.
217+
218+
2. **Use Representative Workloads**
219+
Ensure the workload you're profiling is representative of your actual use case.
220+
221+
3. **Manage Profile File Sizes**
222+
Profile files can grow very large (hundreds of MB to several GB) and may consume significant
223+
disk space. Monitor available disk space during profiling sessions and regularly clean up
224+
old profile files after analysis to prevent storage issues.
225+
226+
227+
Further Reading
228+
---------------
229+
230+
For more detailed information about Memray and its capabilities, refer to the official documentation:
231+
232+
- `Memray Documentation <https://bloomberg.github.io/memray/>`__
233+
- `Memray Getting Started Guide <https://bloomberg.github.io/memray/getting_started.html>`__
234+
- `Understanding Memray Output <https://bloomberg.github.io/memray/run.html>`__
235+
- `Memray API Reference <https://bloomberg.github.io/memray/api.html>`__
725 KB
Loading

airflow-core/pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ dependencies = [
171171
"requests-kerberos>=0.14.0",
172172
"thrift-sasl>=0.4.2",
173173
]
174+
"memray" = [
175+
"memray>=1.19.0",
176+
]
174177
"otel" = [
175178
"opentelemetry-exporter-prometheus>=0.47b0",
176179
]

airflow-core/src/airflow/cli/commands/api_server_command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from airflow.exceptions import AirflowConfigException
3434
from airflow.typing_compat import ParamSpec
3535
from airflow.utils import cli as cli_utils
36+
from airflow.utils.memray_utils import MemrayTraceComponents, enable_memray_trace
3637
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
3738

3839
PS = ParamSpec("PS")
@@ -48,6 +49,7 @@
4849
# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
4950

5051

52+
@enable_memray_trace(component=MemrayTraceComponents.api)
5153
def _run_api_server(args, apps: str, num_workers: int, worker_timeout: int, proxy_headers: bool):
5254
"""Run the API server."""
5355
log.info(

airflow-core/src/airflow/cli/commands/dag_processor_command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
2727
from airflow.jobs.job import Job, run_job
2828
from airflow.utils import cli as cli_utils
29+
from airflow.utils.memray_utils import MemrayTraceComponents, enable_memray_trace
2930
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
3031

3132
log = logging.getLogger(__name__)
@@ -44,6 +45,7 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
4445
)
4546

4647

48+
@enable_memray_trace(component=MemrayTraceComponents.dag_processor)
4749
@cli_utils.action_cli
4850
@providers_configuration_loaded
4951
def dag_processor(args):

airflow-core/src/airflow/cli/commands/scheduler_command.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
from airflow.jobs.job import Job, run_job
3131
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
3232
from airflow.utils import cli as cli_utils
33+
from airflow.utils.memray_utils import MemrayTraceComponents, enable_memray_trace
3334
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
3435
from airflow.utils.scheduler_health import serve_health_check
3536

3637
log = logging.getLogger(__name__)
3738

3839

40+
@enable_memray_trace(component=MemrayTraceComponents.scheduler)
3941
def _run_scheduler_job(args) -> None:
4042
job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
4143
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2561,3 +2561,26 @@ dag_processor:
25612561
type: boolean
25622562
example: ~
25632563
default: "True"
2564+
profiling:
2565+
description: |
2566+
Configuration for memory profiling in Airflow component.
2567+
Currently, we provide profiling using Memray and additional tools may be added in the future
2568+
Also, see the guide in Link (TBD)
2569+
options:
2570+
memray_trace_components:
2571+
description: |
2572+
Comma-separated list of Airflow components to profile with memray.
2573+
Valid components are: scheduler, api, dag_processor
2574+
2575+
This option only takes effect when it is not set to blank (default option).
2576+
start tracing memory allocation and store the metrics in "$AIRFLOW_HOME/<component>_memory.bin"
2577+
To generate analyzed view, run this command in base directory where the bin file is generated
2578+
```
2579+
# see also https://bloomberg.github.io/memray/run.html#aggregated-capture-files
2580+
memray flamegraph $AIRFLOW_HOME/<component>_memory.bin
2581+
```
2582+
This is an expensive operation and generally should not be used except for debugging purposes.
2583+
version_added: 3.1.2
2584+
type: string
2585+
example: "scheduler,api,dag_processor"
2586+
default: ""

airflow-core/src/airflow/configuration.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@
3636
from configparser import ConfigParser, NoOptionError, NoSectionError
3737
from contextlib import contextmanager
3838
from copy import deepcopy
39+
from enum import Enum
3940
from io import StringIO
4041
from json.decoder import JSONDecodeError
4142
from re import Pattern
42-
from typing import IO, TYPE_CHECKING, Any
43+
from typing import IO, TYPE_CHECKING, Any, TypeVar
4344
from urllib.parse import urlsplit
4445

4546
from packaging.version import parse as parse_version
@@ -1248,6 +1249,47 @@ def getlist(self, section: str, key: str, delimiter=",", **kwargs):
12481249
f'Current value: "{val}".'
12491250
)
12501251

1252+
E = TypeVar("E", bound=Enum)
1253+
1254+
def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E:
1255+
val = self.get(section, key, **kwargs)
1256+
enum_names = [enum_item.name for enum_item in enum_class]
1257+
1258+
if val is None:
1259+
raise AirflowConfigException(
1260+
f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1261+
f'Current value: "{val}" and it must be one of {", ".join(enum_names)}'
1262+
)
1263+
1264+
try:
1265+
return enum_class[val]
1266+
except KeyError:
1267+
if "fallback" in kwargs and kwargs["fallback"] in enum_names:
1268+
return enum_class[kwargs["fallback"]]
1269+
raise AirflowConfigException(
1270+
f'Failed to convert value. Please check "{key}" key in "{section}" section. '
1271+
f"the value must be one of {', '.join(enum_names)}"
1272+
)
1273+
1274+
def getenumlist(self, section: str, key: str, enum_class: type[E], delimiter=",", **kwargs) -> list[E]:
1275+
string_list = self.getlist(section, key, delimiter, **kwargs)
1276+
enum_names = [enum_item.name for enum_item in enum_class]
1277+
enum_list = []
1278+
1279+
for val in string_list:
1280+
try:
1281+
enum_list.append(enum_class[val])
1282+
except KeyError:
1283+
log.warning(
1284+
"Failed to convert value. Please check %s key in %s section. "
1285+
"it must be one of %s, if not the value is ignored",
1286+
key,
1287+
section,
1288+
", ".join(enum_names),
1289+
)
1290+
1291+
return enum_list
1292+
12511293
def getimport(self, section: str, key: str, **kwargs) -> Any:
12521294
"""
12531295
Read options, import the full qualified name, and return the object.

0 commit comments

Comments
 (0)