Skip to content

Commit 4b6f0b8

Browse files
committed
Merge remote-tracking branch 'origin/fix/fastapi-memory-leak-only' into fix/fastapi-memory-leak-only
2 parents 7be0d5b + fc60799 commit 4b6f0b8

File tree

3 files changed

+73
-5
lines changed

3 files changed

+73
-5
lines changed

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import functools
4343
import logging
4444
import re
45-
from typing import Any, Callable, Generic, TypeVar
45+
from typing import Any, Awaitable, Callable, Generic, TypeVar
4646

4747
import wrapt
4848
from wrapt import wrap_function_wrapper
@@ -596,6 +596,44 @@ def traced_execution(
596596
self._populate_span(span, cursor, *args)
597597
return query_method(*args, **kwargs)
598598

599+
async def traced_execution_async(
600+
self,
601+
cursor: CursorT,
602+
query_method: Callable[..., Awaitable[Any]],
603+
*args: tuple[Any, ...],
604+
**kwargs: dict[Any, Any],
605+
):
606+
name = self.get_operation_name(cursor, args)
607+
if not name:
608+
name = (
609+
self._db_api_integration.database
610+
if self._db_api_integration.database
611+
else self._db_api_integration.name
612+
)
613+
614+
with self._db_api_integration._tracer.start_as_current_span(
615+
name, kind=SpanKind.CLIENT
616+
) as span:
617+
if span.is_recording():
618+
if args and self._commenter_enabled:
619+
if self._enable_attribute_commenter:
620+
# sqlcomment is added to executed query and db.statement span attribute
621+
args = self._update_args_with_added_sql_comment(
622+
args, cursor
623+
)
624+
self._populate_span(span, cursor, *args)
625+
else:
626+
# sqlcomment is only added to executed query
627+
# so db.statement is set before add_sql_comment
628+
self._populate_span(span, cursor, *args)
629+
args = self._update_args_with_added_sql_comment(
630+
args, cursor
631+
)
632+
else:
633+
# no sqlcomment anywhere
634+
self._populate_span(span, cursor, *args)
635+
return await query_method(*args, **kwargs)
636+
599637

600638
# pylint: disable=abstract-method
601639
class TracedCursorProxy(wrapt.ObjectProxy, Generic[CursorT]):

instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,17 +399,17 @@ def _new_cursor_async_factory(
399399

400400
class TracedCursorAsyncFactory(base_factory):
401401
async def execute(self, *args: Any, **kwargs: Any):
402-
return await _cursor_tracer.traced_execution(
402+
return await _cursor_tracer.traced_execution_async(
403403
self, super().execute, *args, **kwargs
404404
)
405405

406406
async def executemany(self, *args: Any, **kwargs: Any):
407-
return await _cursor_tracer.traced_execution(
407+
return await _cursor_tracer.traced_execution_async(
408408
self, super().executemany, *args, **kwargs
409409
)
410410

411411
async def callproc(self, *args: Any, **kwargs: Any):
412-
return await _cursor_tracer.traced_execution(
412+
return await _cursor_tracer.traced_execution_async(
413413
self, super().callproc, *args, **kwargs
414414
)
415415

instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
1516
import types
1617
from unittest import IsolatedAsyncioTestCase, mock
1718

@@ -50,10 +51,15 @@ def __init__(self, *args, **kwargs):
5051
pass
5152

5253
# pylint: disable=unused-argument, no-self-use
53-
async def execute(self, query, params=None, throw_exception=False):
54+
async def execute(
55+
self, query, params=None, throw_exception=False, delay=0.0
56+
):
5457
if throw_exception:
5558
raise psycopg.Error("Test Exception")
5659

60+
if delay:
61+
await asyncio.sleep(delay)
62+
5763
# pylint: disable=unused-argument, no-self-use
5864
async def executemany(self, query, params=None, throw_exception=False):
5965
if throw_exception:
@@ -492,3 +498,27 @@ async def test_not_recording_async(self):
492498
self.assertFalse(mock_span.set_status.called)
493499

494500
PsycopgInstrumentor().uninstrument()
501+
502+
async def test_tracing_is_async(self):
503+
PsycopgInstrumentor().instrument()
504+
505+
# before this async fix cursor.execute would take 14000 ns, delaying for
506+
# 100,000ns
507+
delay = 0.0001
508+
509+
async def test_async_connection():
510+
acnx = await psycopg.AsyncConnection.connect("test")
511+
async with acnx as cnx:
512+
async with cnx.cursor() as cursor:
513+
await cursor.execute("SELECT * FROM test", delay=delay)
514+
515+
await test_async_connection()
516+
spans_list = self.memory_exporter.get_finished_spans()
517+
self.assertEqual(len(spans_list), 1)
518+
span = spans_list[0]
519+
520+
# duration is nanoseconds
521+
duration = span.end_time - span.start_time
522+
self.assertGreater(duration, delay * 1e9)
523+
524+
PsycopgInstrumentor().uninstrument()

0 commit comments

Comments
 (0)