Skip to content

Commit 818f722

Browse files
authored
Provide descriptive names for asyncio.Task (#5727)
Reflex internally spawns async tasks for background event processing, emitting websocket data, lifespan, telementry, and resolving async computed vars. Now these tasks all have descriptive names that include the event being processed, the token, and a timestamp of when the task started. This extra information in the task name allows users to better identify where potentially problems in the app are hiding.
1 parent bc99f35 commit 818f722

File tree

5 files changed

+32
-10
lines changed

5 files changed

+32
-10
lines changed

reflex/app.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io
1313
import json
1414
import sys
15+
import time
1516
import traceback
1617
import urllib.parse
1718
from collections.abc import (
@@ -1582,7 +1583,10 @@ async def _coro():
15821583
sid=state.router.session.session_id,
15831584
)
15841585

1585-
task = asyncio.create_task(_coro())
1586+
task = asyncio.create_task(
1587+
_coro(),
1588+
name=f"reflex_background_task|{event.name}|{time.time()}|{event.token}",
1589+
)
15861590
self._background_tasks.add(task)
15871591
# Clean up task from background_tasks set when complete.
15881592
task.add_done_callback(self._background_tasks.discard)
@@ -1727,7 +1731,8 @@ async def process(
17271731
"reload",
17281732
data=event,
17291733
to=sid,
1730-
)
1734+
),
1735+
name=f"reflex_emit_reload|{event.name}|{time.time()}|{event.token}",
17311736
)
17321737
return
17331738
# re-assign only when the value is different
@@ -2028,7 +2033,8 @@ def on_disconnect(self, sid: str):
20282033
if disconnect_token:
20292034
# Use async cleanup through token manager
20302035
task = asyncio.create_task(
2031-
self._token_manager.disconnect_token(disconnect_token, sid)
2036+
self._token_manager.disconnect_token(disconnect_token, sid),
2037+
name=f"reflex_disconnect_token|{disconnect_token}|{time.time()}",
20322038
)
20332039
# Don't await to avoid blocking disconnect, but handle potential errors
20342040
task.add_done_callback(
@@ -2047,12 +2053,14 @@ async def emit_update(self, update: StateUpdate, sid: str) -> None:
20472053
# If the sid is None, we are not connected to a client. Prevent sending
20482054
# updates to all clients.
20492055
return
2050-
if sid not in self.sid_to_token:
2056+
token = self.sid_to_token.get(sid)
2057+
if token is None:
20512058
console.warn(f"Attempting to send delta to disconnected websocket {sid}")
20522059
return
20532060
# Creating a task prevents the update from being blocked behind other coroutines.
20542061
await asyncio.create_task(
2055-
self.emit(str(constants.SocketEvent.EVENT), update, to=sid)
2062+
self.emit(str(constants.SocketEvent.EVENT), update, to=sid),
2063+
name=f"reflex_emit_event|{token}|{sid}|{time.time()}",
20562064
)
20572065

20582066
async def on_event(self, sid: str, data: Any):

reflex/app_mixins/lifespan.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import dataclasses
88
import functools
99
import inspect
10+
import time
1011
from collections.abc import Callable, Coroutine
1112

1213
from starlette.applications import Starlette
@@ -36,6 +37,7 @@ async def _run_lifespan_tasks(self, app: Starlette):
3637
if isinstance(task, asyncio.Task):
3738
running_tasks.append(task)
3839
else:
40+
task_name = task.__name__
3941
signature = inspect.signature(task)
4042
if "app" in signature.parameters:
4143
task = functools.partial(task, app=app)
@@ -44,7 +46,10 @@ async def _run_lifespan_tasks(self, app: Starlette):
4446
await stack.enter_async_context(_t)
4547
console.debug(run_msg.format(type="asynccontextmanager"))
4648
elif isinstance(_t, Coroutine):
47-
task_ = asyncio.create_task(_t)
49+
task_ = asyncio.create_task(
50+
_t,
51+
name=f"reflex_lifespan_task|{task_name}|{time.time()}",
52+
)
4853
task_.add_done_callback(lambda t: t.result())
4954
running_tasks.append(task_)
5055
console.debug(run_msg.format(type="coroutine"))
@@ -70,9 +75,10 @@ def register_lifespan_task(self, task: Callable | asyncio.Task, **task_kwargs):
7075
msg = f"Task {task.__name__} of type generator must be decorated with contextlib.asynccontextmanager."
7176
raise InvalidLifespanTaskTypeError(msg)
7277

78+
task_name = task.__name__ # pyright: ignore [reportAttributeAccessIssue]
7379
if task_kwargs:
7480
original_task = task
7581
task = functools.partial(task, **task_kwargs) # pyright: ignore [reportArgumentType]
7682
functools.update_wrapper(task, original_task) # pyright: ignore [reportArgumentType]
7783
self.lifespan_tasks.add(task)
78-
console.debug(f"Registered lifespan task: {task.__name__}") # pyright: ignore [reportAttributeAccessIssue]
84+
console.debug(f"Registered lifespan task: {task_name}")

reflex/istate/manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,8 @@ async def set_state(
667667
_substate_key(client_token, substate),
668668
substate,
669669
lock_id,
670-
)
670+
),
671+
name=f"reflex_set_state|{client_token}|{substate.get_full_name()}",
671672
)
672673
for substate in state.substates.values()
673674
]

reflex/state.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import inspect
1212
import pickle
1313
import sys
14+
import time
1415
import typing
1516
import warnings
1617
from collections.abc import AsyncIterator, Callable, Sequence
@@ -284,7 +285,10 @@ async def _resolve_delta(delta: Delta) -> Delta:
284285
for state_name, state_delta in delta.items():
285286
for var_name, value in state_delta.items():
286287
if asyncio.iscoroutine(value):
287-
tasks[state_name, var_name] = asyncio.create_task(value)
288+
tasks[state_name, var_name] = asyncio.create_task(
289+
value,
290+
name=f"reflex_resolve_delta|{state_name}|{var_name}|{time.time()}",
291+
)
288292
for (state_name, var_name), task in tasks.items():
289293
delta[state_name][var_name] = await task
290294
return delta

reflex/utils/telemetry.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,10 @@ async def async_send(event: str, telemetry_enabled: bool | None, **kwargs):
353353

354354
try:
355355
# Within an event loop context, send the event asynchronously.
356-
task = asyncio.create_task(async_send(event, telemetry_enabled, **kwargs))
356+
task = asyncio.create_task(
357+
async_send(event, telemetry_enabled, **kwargs),
358+
name=f"reflex_send_telemetry_event|{event}",
359+
)
357360
background_tasks.add(task)
358361
task.add_done_callback(background_tasks.discard)
359362
except RuntimeError:

0 commit comments

Comments
 (0)