Skip to content

Commit 29dfd56

Browse files
ibashaabmass
andauthored
Fix tracing of async cursors for psycopg (#3324)
* Fix tracing of async cursors for psycopg This copies the traced_execution of AsyncCursorTracer except query_method is awaited within the span. Fixes #2486 * Adding a regression test for asynccursor bug Adds a test to check that the async tracer is actually awaiting cursor.execute. On my machine, with the bug present, the duration is 14000ns. With the bug patched the the duration is orders of magnitude larger. * Make return type of query_method an awaitable * Move implementation to traced_execution_async --------- Co-authored-by: Aaron Abbott <[email protected]>
1 parent aa0579d commit 29dfd56

File tree

3 files changed

+73
-5
lines changed

3 files changed

+73
-5
lines changed

instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import functools
4343
import logging
4444
import re
45-
from typing import Any, Callable, Generic, TypeVar
45+
from typing import Any, Awaitable, Callable, Generic, TypeVar
4646

4747
import wrapt
4848
from wrapt import wrap_function_wrapper
@@ -596,6 +596,44 @@ def traced_execution(
596596
self._populate_span(span, cursor, *args)
597597
return query_method(*args, **kwargs)
598598

599+
async def traced_execution_async(
600+
self,
601+
cursor: CursorT,
602+
query_method: Callable[..., Awaitable[Any]],
603+
*args: tuple[Any, ...],
604+
**kwargs: dict[Any, Any],
605+
):
606+
name = self.get_operation_name(cursor, args)
607+
if not name:
608+
name = (
609+
self._db_api_integration.database
610+
if self._db_api_integration.database
611+
else self._db_api_integration.name
612+
)
613+
614+
with self._db_api_integration._tracer.start_as_current_span(
615+
name, kind=SpanKind.CLIENT
616+
) as span:
617+
if span.is_recording():
618+
if args and self._commenter_enabled:
619+
if self._enable_attribute_commenter:
620+
# sqlcomment is added to executed query and db.statement span attribute
621+
args = self._update_args_with_added_sql_comment(
622+
args, cursor
623+
)
624+
self._populate_span(span, cursor, *args)
625+
else:
626+
# sqlcomment is only added to executed query
627+
# so db.statement is set before add_sql_comment
628+
self._populate_span(span, cursor, *args)
629+
args = self._update_args_with_added_sql_comment(
630+
args, cursor
631+
)
632+
else:
633+
# no sqlcomment anywhere
634+
self._populate_span(span, cursor, *args)
635+
return await query_method(*args, **kwargs)
636+
599637

600638
# pylint: disable=abstract-method
601639
class TracedCursorProxy(wrapt.ObjectProxy, Generic[CursorT]):

instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,17 +399,17 @@ def _new_cursor_async_factory(
399399

400400
class TracedCursorAsyncFactory(base_factory):
401401
async def execute(self, *args: Any, **kwargs: Any):
402-
return await _cursor_tracer.traced_execution(
402+
return await _cursor_tracer.traced_execution_async(
403403
self, super().execute, *args, **kwargs
404404
)
405405

406406
async def executemany(self, *args: Any, **kwargs: Any):
407-
return await _cursor_tracer.traced_execution(
407+
return await _cursor_tracer.traced_execution_async(
408408
self, super().executemany, *args, **kwargs
409409
)
410410

411411
async def callproc(self, *args: Any, **kwargs: Any):
412-
return await _cursor_tracer.traced_execution(
412+
return await _cursor_tracer.traced_execution_async(
413413
self, super().callproc, *args, **kwargs
414414
)
415415

instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
1516
import types
1617
from unittest import IsolatedAsyncioTestCase, mock
1718

@@ -50,10 +51,15 @@ def __init__(self, *args, **kwargs):
5051
pass
5152

5253
# pylint: disable=unused-argument, no-self-use
53-
async def execute(self, query, params=None, throw_exception=False):
54+
async def execute(
55+
self, query, params=None, throw_exception=False, delay=0.0
56+
):
5457
if throw_exception:
5558
raise psycopg.Error("Test Exception")
5659

60+
if delay:
61+
await asyncio.sleep(delay)
62+
5763
# pylint: disable=unused-argument, no-self-use
5864
async def executemany(self, query, params=None, throw_exception=False):
5965
if throw_exception:
@@ -492,3 +498,27 @@ async def test_not_recording_async(self):
492498
self.assertFalse(mock_span.set_status.called)
493499

494500
PsycopgInstrumentor().uninstrument()
501+
502+
async def test_tracing_is_async(self):
503+
PsycopgInstrumentor().instrument()
504+
505+
# before this async fix cursor.execute would take 14000 ns, delaying for
506+
# 100,000ns
507+
delay = 0.0001
508+
509+
async def test_async_connection():
510+
acnx = await psycopg.AsyncConnection.connect("test")
511+
async with acnx as cnx:
512+
async with cnx.cursor() as cursor:
513+
await cursor.execute("SELECT * FROM test", delay=delay)
514+
515+
await test_async_connection()
516+
spans_list = self.memory_exporter.get_finished_spans()
517+
self.assertEqual(len(spans_list), 1)
518+
span = spans_list[0]
519+
520+
# duration is nanoseconds
521+
duration = span.end_time - span.start_time
522+
self.assertGreater(duration, delay * 1e9)
523+
524+
PsycopgInstrumentor().uninstrument()

0 commit comments

Comments
 (0)