Skip to content

Commit 0795b06

Browse files
committed
Asyncio
1 parent 8d5dc18 commit 0795b06

File tree

3 files changed

+316
-2
lines changed

3 files changed

+316
-2
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
from .pstats_collector import PstatsCollector
2+
3+
4+
class AsyncPstatsCollector(PstatsCollector):
5+
def __init__(self, sample_interval_usec, unwinder):
6+
super().__init__(sample_interval_usec)
7+
self.unwinder = unwinder
8+
self.async_sample_count = 0
9+
10+
def collect(self, stack_frames):
11+
# First collect regular stack frames
12+
super().collect(stack_frames)
13+
14+
# Now collect async task information
15+
try:
16+
awaited_info = self.unwinder.get_all_awaited_by()
17+
if awaited_info:
18+
self._process_async_stats(awaited_info)
19+
except Exception:
20+
# If async collection fails, continue with regular profiling
21+
pass
22+
23+
def _process_async_stats(self, awaited_info):
24+
"""Process the async task information and update stats."""
25+
# Build a map of task_id to TaskInfo for easy lookup
26+
task_map = {}
27+
28+
for thread_info in awaited_info:
29+
if thread_info.thread_id == 0:
30+
continue # Skip empty thread info
31+
32+
for task_info in thread_info.awaited_by:
33+
task_map[task_info.task_id] = task_info
34+
35+
# For each task, reconstruct the full async stack and update stats
36+
for thread_info in awaited_info:
37+
if thread_info.thread_id == 0:
38+
continue
39+
40+
for task_info in thread_info.awaited_by:
41+
full_stacks = self._reconstruct_task_stacks(task_info, task_map)
42+
for stack in full_stacks:
43+
self._update_stats_from_stack(stack)
44+
self.async_sample_count += 1
45+
46+
def _reconstruct_task_stacks(self, task_info, task_map, visited=None):
47+
"""Recursively reconstruct full async stacks for a task."""
48+
if visited is None:
49+
visited = set()
50+
51+
if task_info.task_id in visited:
52+
return [] # Avoid infinite recursion
53+
54+
visited.add(task_info.task_id)
55+
full_stacks = []
56+
57+
# Get the current task's stack from its coroutine_stack
58+
for coro_info in task_info.coroutine_stack:
59+
current_stack = []
60+
61+
# Add a pseudo-frame for the task name if it's not a generic name
62+
if task_info.task_name and not task_info.task_name.startswith('Task-'):
63+
# Add task name as a pseudo-frame
64+
current_stack.append(("<task>", 0, f"[Task:{task_info.task_name}]"))
65+
66+
for frame in coro_info.call_stack:
67+
# Convert FrameInfo to tuple format (filename, lineno, funcname)
68+
# Add [async] prefix to function name for async frames
69+
current_stack.append((frame.filename, frame.lineno, f"[async] {frame.funcname}"))
70+
71+
# If this task is awaited by others, append their stacks
72+
if task_info.awaited_by:
73+
for awaiter_coro in task_info.awaited_by:
74+
# The task_name in CoroInfo is actually the task_id of the awaiting task
75+
awaiter_task_id = awaiter_coro.task_name
76+
77+
if awaiter_task_id in task_map:
78+
awaiter_task = task_map[awaiter_task_id]
79+
# Recursively get the awaiter's full stacks
80+
awaiter_stacks = self._reconstruct_task_stacks(
81+
awaiter_task, task_map, visited
82+
)
83+
if awaiter_stacks:
84+
# Combine current stack with each awaiter stack
85+
for awaiter_stack in awaiter_stacks:
86+
# Current stack is the leaf, awaiter stack is towards root
87+
full_stack = current_stack + awaiter_stack
88+
full_stacks.append(full_stack)
89+
else:
90+
# Awaiter has no further awaiters, use its coroutine stack
91+
for awaiter_task_coro in awaiter_task.coroutine_stack:
92+
awaiter_frames = []
93+
# Add awaiter task name if meaningful
94+
if awaiter_task.task_name and not awaiter_task.task_name.startswith('Task-'):
95+
awaiter_frames.append(("<task>", 0, f"[Task:{awaiter_task.task_name}]"))
96+
for frame in awaiter_task_coro.call_stack:
97+
awaiter_frames.append((frame.filename, frame.lineno, f"[async] {frame.funcname}"))
98+
full_stack = current_stack + awaiter_frames
99+
full_stacks.append(full_stack)
100+
else:
101+
# Can't find awaiter task, just use the awaiter coroutine's stack
102+
awaiter_frames = []
103+
for frame in awaiter_coro.call_stack:
104+
awaiter_frames.append((frame.filename, frame.lineno, f"[async] {frame.funcname}"))
105+
full_stack = current_stack + awaiter_frames
106+
full_stacks.append(full_stack)
107+
else:
108+
# No awaiters, this is a top-level task
109+
full_stacks.append(current_stack)
110+
111+
visited.remove(task_info.task_id)
112+
return full_stacks
113+
114+
def _update_stats_from_stack(self, stack):
115+
"""Update stats from a reconstructed async stack."""
116+
if not stack:
117+
return
118+
119+
# Process the stack similar to regular stack processing
120+
# The first frame (index 0) is the currently executing function (leaf)
121+
for i, frame in enumerate(stack):
122+
func_key = frame # (filename, lineno, funcname)
123+
124+
# Count direct calls (only for the first frame - the leaf)
125+
if i == 0:
126+
if func_key not in self.stats:
127+
self.stats[func_key] = [0, 0, 0.0, 0.0, {}]
128+
self.stats[func_key][0] += 1 # direct calls
129+
self.stats[func_key][2] += self.sample_interval_sec # total time
130+
131+
# Count cumulative calls (for all frames in stack)
132+
if func_key not in self.stats:
133+
self.stats[func_key] = [0, 0, 0.0, 0.0, {}]
134+
self.stats[func_key][1] += 1 # cumulative calls
135+
self.stats[func_key][3] += self.sample_interval_sec # cumulative time
136+
137+
# Track caller relationships (frame i is called by frame i+1)
138+
if i < len(stack) - 1:
139+
caller = stack[i + 1]
140+
if caller not in self.stats[func_key][4]:
141+
self.stats[func_key][4][caller] = [0, 0, 0.0, 0.0]
142+
self.stats[func_key][4][caller][0] += 1
143+
self.stats[func_key][4][caller][2] += self.sample_interval_sec
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import collections
2+
import os
3+
from .stack_collectors import StackTraceCollector
4+
5+
6+
class AsyncStackCollector(StackTraceCollector):
7+
def __init__(self, unwinder):
8+
super().__init__()
9+
self.unwinder = unwinder
10+
self.async_stacks = []
11+
12+
def collect(self, stack_frames):
13+
# First collect regular stack frames
14+
super().collect(stack_frames)
15+
16+
# Now collect async task information
17+
try:
18+
awaited_info = self.unwinder.get_all_awaited_by()
19+
if awaited_info:
20+
self._process_async_stacks(awaited_info)
21+
except Exception:
22+
# If async collection fails, continue with regular profiling
23+
pass
24+
25+
def _process_async_stacks(self, awaited_info):
26+
"""Process the async task information and reconstruct full stacks."""
27+
# Build a map of task_id to TaskInfo for easy lookup
28+
task_map = {}
29+
30+
for thread_info in awaited_info:
31+
if thread_info.thread_id == 0:
32+
continue # Skip empty thread info
33+
34+
for task_info in thread_info.awaited_by:
35+
task_map[task_info.task_id] = task_info
36+
37+
# For each task, reconstruct the full async stack
38+
for thread_info in awaited_info:
39+
if thread_info.thread_id == 0:
40+
continue
41+
42+
for task_info in thread_info.awaited_by:
43+
full_stacks = self._reconstruct_task_stacks(task_info, task_map)
44+
self.async_stacks.extend(full_stacks)
45+
46+
def _reconstruct_task_stacks(self, task_info, task_map, visited=None):
47+
"""Recursively reconstruct full async stacks for a task."""
48+
if visited is None:
49+
visited = set()
50+
51+
if task_info.task_id in visited:
52+
return [] # Avoid infinite recursion
53+
54+
visited.add(task_info.task_id)
55+
full_stacks = []
56+
57+
# Get the current task's stack from its coroutine_stack
58+
for coro_info in task_info.coroutine_stack:
59+
current_stack = []
60+
for frame in coro_info.call_stack:
61+
# Convert FrameInfo to tuple format (filename, lineno, funcname)
62+
current_stack.append((frame.filename, frame.lineno, frame.funcname))
63+
64+
# If this task is awaited by others, we need to connect to awaiter stacks
65+
if task_info.awaited_by:
66+
for awaiter_coro in task_info.awaited_by:
67+
# The task_name in CoroInfo is actually the task_id of the awaiting task
68+
awaiter_task_id = awaiter_coro.task_name
69+
70+
if awaiter_task_id in task_map:
71+
awaiter_task = task_map[awaiter_task_id]
72+
# Recursively get the awaiter's full stacks
73+
awaiter_stacks = self._reconstruct_task_stacks(
74+
awaiter_task, task_map, visited
75+
)
76+
if awaiter_stacks:
77+
# For each awaiter stack, create a combined stack
78+
for awaiter_stack in awaiter_stacks:
79+
# Build: current_stack + [current_task_name] + awaiter_stack
80+
full_stack = current_stack[:]
81+
# Insert task name as transition between this task and its awaiter
82+
if task_info.task_name and not task_info.task_name.startswith('Task-'):
83+
full_stack.append(("", 0, f"[Task:{task_info.task_name}]"))
84+
full_stack.extend(awaiter_stack)
85+
full_stacks.append(full_stack)
86+
else:
87+
# Awaiter has no further awaiters, build final stack
88+
full_stack = current_stack[:]
89+
# Add current task name
90+
if task_info.task_name and not task_info.task_name.startswith('Task-'):
91+
full_stack.append(("", 0, f"[Task:{task_info.task_name}]"))
92+
# Add awaiter's coroutine stack
93+
for awaiter_task_coro in awaiter_task.coroutine_stack:
94+
for frame in awaiter_task_coro.call_stack:
95+
full_stack.append((frame.filename, frame.lineno, frame.funcname))
96+
# Add awaiter task name at the end if it's a leaf
97+
if awaiter_task.task_name and not awaiter_task.task_name.startswith('Task-'):
98+
full_stack.append(("", 0, f"[Task:{awaiter_task.task_name}]"))
99+
full_stacks.append(full_stack)
100+
else:
101+
# Can't find awaiter task in map, use the coroutine info directly
102+
full_stack = current_stack[:]
103+
# Add current task name
104+
if task_info.task_name and not task_info.task_name.startswith('Task-'):
105+
full_stack.append(("", 0, f"[Task:{task_info.task_name}]"))
106+
# Add awaiter coroutine's stack
107+
for frame in awaiter_coro.call_stack:
108+
full_stack.append((frame.filename, frame.lineno, frame.funcname))
109+
full_stacks.append(full_stack)
110+
else:
111+
# No awaiters, this is a leaf task - just add task name at the end
112+
full_stack = current_stack[:]
113+
if task_info.task_name and not task_info.task_name.startswith('Task-'):
114+
full_stack.append(("", 0, f"[Task:{task_info.task_name}]"))
115+
full_stacks.append(full_stack)
116+
117+
visited.remove(task_info.task_id)
118+
return full_stacks
119+
120+
def export(self, filename):
121+
"""Export both regular and async stacks in collapsed format."""
122+
stack_counter = collections.Counter()
123+
124+
# Add regular stacks
125+
for call_tree in self.call_trees:
126+
stack_str = ";".join(
127+
f"{os.path.basename(f[0])}:{f[2]}:{f[1]}" for f in call_tree
128+
)
129+
stack_counter[stack_str] += 1
130+
131+
# Add async stacks with [async] prefix
132+
for async_stack in self.async_stacks:
133+
# Reverse to get root->leaf order
134+
async_stack_reversed = list(reversed(async_stack))
135+
stack_parts = ["[async]"]
136+
for f in async_stack_reversed:
137+
if f[0] == "" and f[1] == 0 and f[2].startswith("[Task:"):
138+
# This is a task name pseudo-frame
139+
stack_parts.append(f[2])
140+
else:
141+
# Regular frame
142+
stack_parts.append(f"{os.path.basename(f[0])}:{f[2]}:{f[1]}")
143+
stack_str = ";".join(stack_parts)
144+
stack_counter[stack_str] += 1
145+
146+
with open(filename, "w") as f:
147+
for stack, count in stack_counter.items():
148+
f.write(f"{stack} {count}\n")
149+
150+
total_regular = len(self.call_trees)
151+
total_async = len(self.async_stacks)
152+
print(f"Collapsed stack output written to {filename}")
153+
print(f" Regular stacks: {total_regular}")
154+
print(f" Async stacks: {total_async}")

Lib/profile/sample.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
from .pstats_collector import PstatsCollector
1212
from .stack_collectors import CollapsedStackCollector
13+
from .async_stack_collector import AsyncStackCollector
14+
from .async_pstats_collector import AsyncPstatsCollector
1315

1416
FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
1517

@@ -460,6 +462,7 @@ def sample(
460462
show_summary=True,
461463
output_format="pstats",
462464
realtime_stats=False,
465+
asyncio=False,
463466
):
464467
profiler = SampleProfiler(
465468
pid, sample_interval_usec, all_threads=all_threads
@@ -469,9 +472,15 @@ def sample(
469472
collector = None
470473
match output_format:
471474
case "pstats":
472-
collector = PstatsCollector(sample_interval_usec)
475+
if asyncio:
476+
collector = AsyncPstatsCollector(sample_interval_usec, profiler.unwinder)
477+
else:
478+
collector = PstatsCollector(sample_interval_usec)
473479
case "collapsed":
474-
collector = CollapsedStackCollector()
480+
if asyncio:
481+
collector = AsyncStackCollector(profiler.unwinder)
482+
else:
483+
collector = CollapsedStackCollector()
475484
filename = filename or f"collapsed.{pid}.txt"
476485
case _:
477486
raise ValueError(f"Invalid output format: {output_format}")
@@ -615,6 +624,13 @@ def main():
615624
"or saves to collapsed.<pid>.txt for collapsed format)",
616625
)
617626

627+
# Async options
628+
sampling_group.add_argument(
629+
"--asyncio",
630+
action="store_true",
631+
help="Collect and display async task stacks (experimental)",
632+
)
633+
618634
# pstats-specific options
619635
pstats_group = parser.add_argument_group("pstats format options")
620636
sort_group = pstats_group.add_mutually_exclusive_group()
@@ -700,6 +716,7 @@ def main():
700716
show_summary=not args.no_summary,
701717
output_format=args.format,
702718
realtime_stats=args.realtime_stats,
719+
asyncio=args.asyncio,
703720
)
704721

705722

0 commit comments

Comments
 (0)