1+ # Copyright The OpenTelemetry Authors
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at
6+ #
7+ # http://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ # See the License for the specific language governing permissions and
13+ # limitations under the License.
14+
15+ """
16+ A general test verifying that when the same Future objects (or coroutines) are
17+ repeatedly instrumented (for example, via `trace_future`), callback references
18+ do not leak. In this example, we mimic a typical scenario where a small set of
19+ Futures might be reused throughout an application's lifecycle.
20+ """
21+
22+ import asyncio
23+
24+ from opentelemetry .test .test_base import TestBase
25+
26+ from opentelemetry .instrumentation .asyncio import AsyncioInstrumentor
27+
28+
29+ class MockSubscription :
30+ """
31+ Example class holding an unsubscribe_future, similar to something like
32+ aiokafka's subscription.
33+ """
34+ def __init__ (self ):
35+ self .unsubscribe_future = asyncio .Future ()
36+
37+
38+ class MockGroupCoordinator :
39+ """
40+ Example class modeling repeated instrumentation of the same Future objects.
41+ """
42+ def __init__ (self ):
43+ self ._closing = asyncio .Future ()
44+ self .subscription = MockSubscription ()
45+ self ._rejoin_needed_fut = asyncio .Future ()
46+
47+ async def run_routine (self , instrumentor ):
48+ """
49+ Each time this routine is called, the same 3 Futures are 'traced' again.
50+ In a real-life scenario, there's often a loop reusing these objects.
51+ """
52+ instrumentor .trace_future (self ._closing )
53+ instrumentor .trace_future (self .subscription .unsubscribe_future )
54+ instrumentor .trace_future (self ._rejoin_needed_fut )
55+
56+
57+ class TestAsyncioDuplicateInstrument (TestBase ):
58+ """
59+ Tests whether repeated instrumentation of the same Futures leads to
60+ exponential callback growth (potential memory leak).
61+ """
62+
63+ def setUp (self ):
64+ super ().setUp ()
65+ self .instrumentor = AsyncioInstrumentor ()
66+ self .instrumentor .instrument ()
67+
68+ def tearDown (self ):
69+ self .instrumentor .uninstrument ()
70+ super ().tearDown ()
71+
72+ def test_duplicate_instrumentation_of_futures (self ):
73+ """
74+ If instrumentor.trace_future is called multiple times on the same Future,
75+ we should NOT see an unbounded accumulation of callbacks.
76+ """
77+ coordinator = MockGroupCoordinator ()
78+
79+ # Simulate calling the routine multiple times
80+ num_iterations = 10
81+ for _ in range (num_iterations ):
82+ asyncio .run (coordinator .run_routine (self .instrumentor ))
83+
84+ # Check for callback accumulation
85+ closing_cb_count = len (coordinator ._closing ._callbacks )
86+ unsub_cb_count = len (coordinator .subscription .unsubscribe_future ._callbacks )
87+ rejoin_cb_count = len (coordinator ._rejoin_needed_fut ._callbacks )
88+
89+ # If instrumentation is properly deduplicated, each Future might have ~1-2 callbacks.
90+ max_expected_callbacks = 2
91+ self .assertLessEqual (
92+ closing_cb_count ,
93+ max_expected_callbacks ,
94+ f"_closing Future has { closing_cb_count } callbacks. Potential leak!" ,
95+ )
96+ self .assertLessEqual (
97+ unsub_cb_count ,
98+ max_expected_callbacks ,
99+ f"unsubscribe_future has { unsub_cb_count } callbacks. Potential leak!" ,
100+ )
101+ self .assertLessEqual (
102+ rejoin_cb_count ,
103+ max_expected_callbacks ,
104+ f"_rejoin_needed_fut has { rejoin_cb_count } callbacks. Potential leak!" ,
105+ )
0 commit comments