Skip to content

Commit 29571ee

Browse files
Support Async Generators in Transaction Decorators (#1543)
* Add support for AsyncGenerators in transaction wrappers * Add tests for async_generator_wrapper * Add tests for async_generator_proxy * Cleanup test names * Add large explanatory comments * Silence pytest warning for TestException containing Test * Correct async generator aclose behavior and tests * Correct tests for pypy garbage collection --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent fb099e8 commit 29571ee

File tree

5 files changed

+558
-26
lines changed

5 files changed

+558
-26
lines changed

newrelic/common/async_proxy.py

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
import logging
1616
import time
1717

18-
from newrelic.common.coroutine import is_asyncio_coroutine, is_coroutine_callable, is_generator_function
18+
from newrelic.common.coroutine import (
19+
is_async_generator_function,
20+
is_asyncio_coroutine,
21+
is_coroutine_callable,
22+
is_generator_function,
23+
)
1924
from newrelic.common.object_wrapper import ObjectProxy
2025
from newrelic.core.trace_cache import trace_cache
2126

@@ -29,6 +34,11 @@ def __init__(self, transaction_init):
2934
self.enter_time = None
3035
self.transaction = None
3136
self.transaction_init = transaction_init
37+
# This flag is used to tell if the top level coroutine is an async generator.
38+
# Depending on the conditions, the behavior when encountering
39+
# StopIteration vs StopAsyncIteration is different. See the comments in __exit__
40+
# for details on the expected behavior.
41+
self.is_async_generator = False
3242

3343
def pre_close(self):
3444
# If close is called prior to the start of the coroutine do not create
@@ -90,11 +100,63 @@ def __exit__(self, exc, value, tb):
90100
except:
91101
CancelledError = GeneratorExit
92102

93-
# case: coroutine completed or cancelled
94-
if exc is StopIteration or exc is GeneratorExit or exc is CancelledError:
103+
# There are 3 separate cases we need to consider for when attempting to exit this context manager.
104+
#
105+
# When running as a generator, the execution path is simple. When StopIteration or GeneratorExit
106+
# is raised, the generator has completed normally so we complete the transaction. Other exceptions
107+
# are treated as errors and the transaction is closed with the exception details.
108+
#
109+
# When running as a coroutine, the execution path is only slightly changed. Calling __await__ on
110+
# the coroutine will return a generator which is driven to completion. Since CoroutineProxy wraps
111+
# this generator in a GeneratorProxy, the execution path ends up as the same as a generator, with
112+
# the additional consideration of asyncio.CancelledError which is raised if the coroutine is cancelled.
113+
# This is treated the same as GeneratorExit, completing the transaction without error.
114+
#
115+
# When running as an async generator, the execution path is considerably more complex. All of the
116+
# interfaces for the async generator (__anext__, asend, athrow, aclose) are coroutines which themselves
117+
# require context management. These coroutines are awaited, which again creates a generator that must also
118+
# be tracked. This means that the TransactionContext will be entered and exited multiple times with
119+
# StopIteration being thrown in an underlying generator for each item that's yielded from the async generator.
120+
# We therefore need to avoid completing the transaction when StopIteration is raised, and only complete
121+
# the transaction when StopAsyncIteration is raised which indicates the async generator has completed normally.
122+
# GeneratorExit and asyncio.CancelledError are treated the same as the other cases, completing the transaction
123+
# without error.
124+
#
125+
# The is_async_generator flag is used to tell if the top level coroutine is an async generator, which will
126+
# then change the behavior with respect to StopIteration. As this TransactionContext object is shared
127+
# between all the wrappers for async generators, the coroutines they create, and the generators created
128+
# by awaiting those coroutines, we have to change the behavior of all of these types of wrappers when
129+
# run under an async generator to avoid completing the transaction early.
130+
131+
if exc is StopAsyncIteration:
132+
# If an async generator completes normally, complete the transaction without error.
133+
# This condition is also manually run when an async generator is closed via AsyncGeneratorProxy.aclose().
134+
if self.is_async_generator:
135+
self.transaction.__exit__(None, None, None)
136+
# If a non-async generator reaches this, complete the transaction and report as an error.
137+
else:
138+
self.transaction.__exit__(exc, value, tb)
139+
140+
elif exc is StopIteration:
141+
# If a non-async generator completes normally, complete the transaction without error.
142+
if not self.is_async_generator:
143+
self.transaction.__exit__(None, None, None)
144+
145+
# If an async generator reaches this, don't complete the transaction as this is
146+
# caused by yielding an item from the generator. This is due to completing the
147+
# underlying coroutine which is a generator internally and will raise StopIteration.
148+
# We need to wait until the async generator itself completes normally with
149+
# a final StopAsyncIteration.
150+
151+
# Note: This block is equivalent to "else: pass".
152+
# If this code block is unnested in the future, that should be made explicit.
153+
154+
# If coroutine was cancelled, either by asyncio.CancelledError, .close(),
155+
# complete the transaction without error.
156+
elif exc in (GeneratorExit, CancelledError):
95157
self.transaction.__exit__(None, None, None)
96158

97-
# case: coroutine completed because of error
159+
# Unexpected exception, complete the transaction and report as an error.
98160
elif exc:
99161
self.transaction.__exit__(exc, value, tb)
100162

@@ -151,9 +213,44 @@ def __await__(self):
151213
return GeneratorProxy(self.__wrapped__, self._nr_context)
152214

153215

216+
class AsyncGeneratorProxy(ObjectProxy):
217+
def __init__(self, wrapped, context):
218+
super().__init__(wrapped)
219+
self._nr_context = context
220+
# Set this flag to indicate that the top level coroutine is an async generator,
221+
# which will change the behavior of __exit__ in TransactionContext for all the
222+
# lower level wrappers.
223+
self._nr_context.is_async_generator = True
224+
225+
def __aiter__(self):
226+
return self
227+
228+
async def __anext__(self):
229+
return await self.asend(None)
230+
231+
async def asend(self, value):
232+
return await CoroutineProxy(self.__wrapped__.asend(value), self._nr_context)
233+
234+
async def athrow(self, *args, **kwargs):
235+
return await CoroutineProxy(self.__wrapped__.athrow(*args, **kwargs), self._nr_context)
236+
237+
async def aclose(self):
238+
try:
239+
return await CoroutineProxy(self.__wrapped__.aclose(), self._nr_context)
240+
finally:
241+
# There is nothing further down that can tell correctly that the async generator
242+
# is being closed, so we call __exit__ with StopAsyncIteration manually to ensure
243+
# the transaction is correctly completed without error. Otherwise this would rely on
244+
# the garbage collector deleting the async generator to complete the transaction by
245+
# calling Transaction.__del__, which will cause a hanging transaction and race conditions.
246+
self._nr_context.__exit__(StopAsyncIteration, None, None)
247+
248+
154249
def async_proxy(wrapped):
155250
if is_coroutine_callable(wrapped):
156251
return CoroutineProxy
252+
elif is_async_generator_function(wrapped):
253+
return AsyncGeneratorProxy
157254
elif is_generator_function(wrapped):
158255
if is_asyncio_coroutine(wrapped):
159256
return AwaitableGeneratorProxy

tests/agent_features/test_async_generator_trace.py renamed to tests/agent_features/test_async_generator_trace_wrapper.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
import functools
16+
import gc
17+
import sys
1618
import time
1719

1820
import pytest
@@ -84,7 +86,7 @@ class MyException(Exception):
8486
scoped_metrics=[("Function/agen", 1)],
8587
rollup_metrics=[("Function/agen", 1)],
8688
)
87-
@validate_transaction_errors(errors=["test_async_generator_trace:MyException"])
89+
@validate_transaction_errors(errors=["test_async_generator_trace_wrapper:MyException"])
8890
def test_async_generator_error(event_loop):
8991
@function_trace(name="agen")
9092
async def agen():
@@ -182,30 +184,36 @@ async def _test():
182184

183185

184186
@validate_transaction_metrics(
185-
"test_async_generator_close_ends_trace",
187+
"test_async_generator_aclose_ends_trace",
186188
background_task=True,
187189
scoped_metrics=[("Function/agen", 1)],
188190
rollup_metrics=[("Function/agen", 1)],
189191
)
190-
def test_async_generator_close_ends_trace(event_loop):
192+
def test_async_generator_aclose_ends_trace(event_loop):
191193
@function_trace(name="agen")
192194
async def agen():
193195
yield
194196

195-
@background_task(name="test_async_generator_close_ends_trace")
196-
async def _test():
197-
gen = agen()
197+
# Save a reference to the generator and run the validations before that
198+
# is garbage collected to avoid this test becoming a duplicate
199+
# of the test "test_incomplete_async_generator"
200+
gen = agen()
198201

199-
# kickstart the coroutine
200-
await gen.asend(None)
202+
@background_task(name="test_async_generator_aclose_ends_trace")
203+
def _test_async_generator_aclose_ends_trace():
204+
async def _test():
205+
# kickstart the coroutine
206+
await gen.asend(None)
201207

202-
# trace should be ended/recorded by close
203-
await gen.aclose()
208+
# trace should be ended/recorded by close
209+
await gen.aclose()
204210

205-
# We may call gen.close as many times as we want
206-
await gen.aclose()
211+
# We may call gen.close as many times as we want
212+
await gen.aclose()
207213

208-
event_loop.run_until_complete(_test())
214+
event_loop.run_until_complete(_test())
215+
216+
_test_async_generator_aclose_ends_trace()
209217

210218

211219
@validate_tt_parenting(("TransactionNode", [("FunctionNode", [("FunctionNode", [])])]))
@@ -493,6 +501,10 @@ async def _test():
493501
async for _ in c:
494502
break
495503

504+
# This test differs from the test for async_proxy in that the generator
505+
# going out of scope does not immediately close the trace. Instead, it's
506+
# the transaction ending that closes the trace. No need to call gc.collect().
507+
496508
if nr_transaction:
497509
_test = background_task(name="test_incomplete_async_generator")(_test)
498510

0 commit comments

Comments
 (0)