Skip to content

Commit 606538a

Browse files
Lendemormasenf
andauthored
allow leak and block detection on handlers (#5620)
* allow leak and block detection on handlers * optional deps and simpler file tree * avoid repeating imports * fix * Handle async gen and regular gen Add overloads to `monitor_leaks` to avoid pyright ignores Remove extra layer of callable in `monitor_leaks` * temporarily disable thread leak detection * temporarily disable task leak detection * move monitoring to utils folder and fix precommit * adjust default threshold * rename decorator * prevent nested monitoring --------- Co-authored-by: Masen Furer <[email protected]>
1 parent 81b1b56 commit 606538a

File tree

4 files changed

+207
-1
lines changed

4 files changed

+207
-1
lines changed

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ dependencies = [
3939
"typing_extensions >=4.13.0",
4040
"wrapt >=1.17.0,<2.0",
4141
]
42+
4243
classifiers = [
4344
"Development Status :: 4 - Beta",
4445
"License :: OSI Approved :: Apache Software License",
@@ -49,6 +50,10 @@ classifiers = [
4950
"Programming Language :: Python :: 3.13",
5051
]
5152

53+
[project.optional-dependencies]
54+
monitoring = [
55+
"pyleak >=0.1.14,<1.0",
56+
]
5257

5358
[project.urls]
5459
homepage = "https://reflex.dev"
@@ -74,6 +79,7 @@ dev = [
7479
"pre-commit",
7580
"psutil",
7681
"psycopg[binary]",
82+
"pyleak >=0.1.14,<1.0",
7783
"pyright",
7884
"pytest-asyncio",
7985
"pytest-benchmark",

reflex/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
from reflex.utils import console
3030
from reflex.utils.exceptions import ConfigError
3131

32+
if TYPE_CHECKING:
33+
from pyleak.base import LeakAction
34+
3235

3336
@dataclasses.dataclass(kw_only=True)
3437
class DBConfig:
@@ -186,6 +189,18 @@ class BaseConfig:
186189
# Telemetry opt-in.
187190
telemetry_enabled: bool = True
188191

192+
# PyLeak monitoring configuration for detecting event loop blocking and resource leaks.
193+
enable_pyleak_monitoring: bool = False
194+
195+
# Threshold in seconds for detecting event loop blocking operations.
196+
pyleak_blocking_threshold: float = 0.1
197+
198+
# Grace period in seconds for thread leak detection cleanup.
199+
pyleak_thread_grace_period: float = 0.2
200+
201+
# Action to take when PyLeak detects issues
202+
pyleak_action: "LeakAction | None" = None
203+
189204
# The bun path
190205
bun_path: ExistingPath = constants.Bun.DEFAULT_PATH
191206

reflex/state.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
)
6161
from reflex.utils.exceptions import ImmutableStateError as ImmutableStateError
6262
from reflex.utils.exec import is_testing_env
63+
from reflex.utils.monitoring import is_pyleak_enabled, monitor_loopblocks
6364
from reflex.utils.types import _isinstance, is_union, value_inside_optional
6465
from reflex.vars import Field, VarData, field
6566
from reflex.vars.base import (
@@ -1784,7 +1785,11 @@ async def _process_event(
17841785
from reflex.utils import telemetry
17851786

17861787
# Get the function to process the event.
1787-
fn = functools.partial(handler.fn, state)
1788+
if is_pyleak_enabled():
1789+
console.debug(f"Monitoring leaks for handler: {handler.fn.__qualname__}")
1790+
fn = functools.partial(monitor_loopblocks(handler.fn), state)
1791+
else:
1792+
fn = functools.partial(handler.fn, state)
17881793

17891794
try:
17901795
type_hints = typing.get_type_hints(handler.fn)

reflex/utils/monitoring.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
"""PyLeak integration for monitoring event loop blocking and resource leaks in Reflex applications."""
2+
3+
import asyncio
4+
import contextlib
5+
import functools
6+
import inspect
7+
import threading
8+
from collections.abc import AsyncGenerator, Awaitable, Callable, Generator
9+
from typing import TypeVar, overload
10+
11+
from reflex.config import get_config
12+
13+
try:
14+
from pyleak import no_event_loop_blocking, no_task_leaks, no_thread_leaks
15+
from pyleak.base import LeakAction
16+
17+
PYLEAK_AVAILABLE = True
18+
except ImportError:
19+
PYLEAK_AVAILABLE = False
20+
no_event_loop_blocking = no_task_leaks = no_thread_leaks = None # pyright: ignore[reportAssignmentType]
21+
LeakAction = None # pyright: ignore[reportAssignmentType]
22+
23+
24+
# Thread-local storage to track if monitoring is already active
25+
_thread_local = threading.local()
26+
27+
28+
def is_pyleak_enabled() -> bool:
29+
"""Check if PyLeak monitoring is enabled and available.
30+
31+
Returns:
32+
True if PyLeak monitoring is enabled in config and PyLeak is available.
33+
"""
34+
if not PYLEAK_AVAILABLE:
35+
return False
36+
config = get_config()
37+
return config.enable_pyleak_monitoring
38+
39+
40+
@contextlib.contextmanager
41+
def monitor_sync():
42+
"""Sync context manager for PyLeak monitoring.
43+
44+
Yields:
45+
None: Context for monitoring sync operations.
46+
"""
47+
if not is_pyleak_enabled():
48+
yield
49+
return
50+
51+
# Check if monitoring is already active in this thread
52+
if getattr(_thread_local, "monitoring_active", False):
53+
yield
54+
return
55+
56+
config = get_config()
57+
action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess]
58+
59+
# Mark monitoring as active
60+
_thread_local.monitoring_active = True
61+
try:
62+
with contextlib.ExitStack() as stack:
63+
# Thread leak detection has issues with background tasks (no_thread_leaks)
64+
stack.enter_context(
65+
no_event_loop_blocking( # pyright: ignore[reportOptionalCall]
66+
action=action,
67+
threshold=config.pyleak_blocking_threshold,
68+
)
69+
)
70+
yield
71+
finally:
72+
_thread_local.monitoring_active = False
73+
74+
75+
@contextlib.asynccontextmanager
76+
async def monitor_async():
77+
"""Async context manager for PyLeak monitoring.
78+
79+
Yields:
80+
None: Context for monitoring async operations.
81+
"""
82+
if not is_pyleak_enabled():
83+
yield
84+
return
85+
86+
# Check if monitoring is already active in this thread
87+
if getattr(_thread_local, "monitoring_active", False):
88+
yield
89+
return
90+
91+
config = get_config()
92+
action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess]
93+
94+
# Mark monitoring as active
95+
_thread_local.monitoring_active = True
96+
try:
97+
async with contextlib.AsyncExitStack() as stack:
98+
# Thread leak detection has issues with background tasks (no_thread_leaks)
99+
# Re-add thread leak later.
100+
101+
# Block detection for event loops
102+
stack.enter_context(
103+
no_event_loop_blocking( # pyright: ignore[reportOptionalCall]
104+
action=action,
105+
threshold=config.pyleak_blocking_threshold,
106+
)
107+
)
108+
# Task leak detection has issues with background tasks (no_task_leaks)
109+
110+
yield
111+
finally:
112+
_thread_local.monitoring_active = False
113+
114+
115+
YieldType = TypeVar("YieldType")
116+
SendType = TypeVar("SendType")
117+
ReturnType = TypeVar("ReturnType")
118+
119+
120+
@overload
121+
def monitor_loopblocks(
122+
func: Callable[..., AsyncGenerator[YieldType, ReturnType]],
123+
) -> Callable[..., AsyncGenerator[YieldType, ReturnType]]: ...
124+
125+
126+
@overload
127+
def monitor_loopblocks(
128+
func: Callable[..., Generator[YieldType, SendType, ReturnType]],
129+
) -> Callable[..., Generator[YieldType, SendType, ReturnType]]: ...
130+
131+
132+
@overload
133+
def monitor_loopblocks(
134+
func: Callable[..., Awaitable[ReturnType]],
135+
) -> Callable[..., Awaitable[ReturnType]]: ...
136+
137+
138+
def monitor_loopblocks(func: Callable) -> Callable:
139+
"""Framework decorator using the monitoring module's context manager.
140+
141+
Args:
142+
func: The function to be monitored for leaks.
143+
144+
Returns:
145+
Decorator function that applies PyLeak monitoring to sync/async functions.
146+
"""
147+
if inspect.isasyncgenfunction(func):
148+
149+
@functools.wraps(func)
150+
async def async_gen_wrapper(*args, **kwargs):
151+
async with monitor_async():
152+
async for item in func(*args, **kwargs):
153+
yield item
154+
155+
return async_gen_wrapper
156+
157+
if asyncio.iscoroutinefunction(func):
158+
159+
@functools.wraps(func)
160+
async def async_wrapper(*args, **kwargs):
161+
async with monitor_async():
162+
return await func(*args, **kwargs)
163+
164+
return async_wrapper
165+
166+
if inspect.isgeneratorfunction(func):
167+
168+
@functools.wraps(func)
169+
def gen_wrapper(*args, **kwargs):
170+
with monitor_sync():
171+
yield from func(*args, **kwargs)
172+
173+
return gen_wrapper
174+
175+
@functools.wraps(func)
176+
def sync_wrapper(*args, **kwargs):
177+
with monitor_sync():
178+
return func(*args, **kwargs)
179+
180+
return sync_wrapper # pyright: ignore[reportReturnType]

0 commit comments

Comments
 (0)