From 71f50a772164b9e8813feac9612c20e213325c4e Mon Sep 17 00:00:00 2001 From: Islam Sharabash Date: Mon, 3 Mar 2025 17:50:57 -0800 Subject: [PATCH 1/4] Fix tracing of async cursors for psycopg This copies the traced_execution of AsyncCursorTracer except query_method is awaited within the span. Fixes https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2486 --- .../instrumentation/psycopg/__init__.py | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 38a6264c6d..19f3576c32 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -149,7 +149,7 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg.package import _instruments from opentelemetry.instrumentation.psycopg.version import __version__ -from opentelemetry.trace import TracerProvider +from opentelemetry.trace import SpanKind, TracerProvider _logger = logging.getLogger(__name__) _OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" @@ -381,6 +381,46 @@ def callproc(self, *args: Any, **kwargs: Any): return TracedCursorFactory +class AsyncCursorTracer(CursorTracer): + async def traced_execution( + self, + cursor: dbapi.CursorT, + query_method: Callable[..., Any], + *args: tuple[Any, ...], + **kwargs: dict[Any, Any], + ): + name = self.get_operation_name(cursor, args) + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) + + with self._db_api_integration._tracer.start_as_current_span( + name, kind=SpanKind.CLIENT + ) as span: + if span.is_recording(): + if args and self._commenter_enabled: + if self._enable_attribute_commenter: + # sqlcomment is added to executed query and db.statement span attribute + args = self._update_args_with_added_sql_comment( + args, cursor + ) + self._populate_span(span, cursor, *args) + else: + # sqlcomment is only added to executed query + # so db.statement is set before add_sql_comment + self._populate_span(span, cursor, *args) + args = self._update_args_with_added_sql_comment( + args, cursor + ) + else: + # no sqlcomment anywhere + self._populate_span(span, cursor, *args) + return await query_method(*args, **kwargs) + + def _new_cursor_async_factory( db_api: DatabaseApiAsyncIntegration | None = None, base_factory: type[psycopg.AsyncCursor] | None = None, @@ -395,7 +435,7 @@ def _new_cursor_async_factory( tracer_provider=tracer_provider, ) base_factory = base_factory or psycopg.AsyncCursor - _cursor_tracer = CursorTracer(db_api) + _cursor_tracer = AsyncCursorTracer(db_api) class TracedCursorAsyncFactory(base_factory): async def execute(self, *args: Any, **kwargs: Any): From 0d8cd7417d8382a0b342888c33b86381b46c3903 Mon Sep 17 00:00:00 2001 From: Islam Sharabash Date: Mon, 7 Apr 2025 10:13:59 -0700 Subject: [PATCH 2/4] Adding a regression test for asynccursor bug Adds a test to check that the async tracer is actually awaiting cursor.execute. On my machine, with the bug present, the duration is 14000ns. With the bug patched the the duration is orders of magnitude larger. --- .../tests/test_psycopg_integration.py | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index 6c9bcf2d4b..0987d3e932 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import types from unittest import IsolatedAsyncioTestCase, mock @@ -50,10 +51,15 @@ def __init__(self, *args, **kwargs): pass # pylint: disable=unused-argument, no-self-use - async def execute(self, query, params=None, throw_exception=False): + async def execute( + self, query, params=None, throw_exception=False, delay=0.0 + ): if throw_exception: raise psycopg.Error("Test Exception") + if delay: + await asyncio.sleep(delay) + # pylint: disable=unused-argument, no-self-use async def executemany(self, query, params=None, throw_exception=False): if throw_exception: @@ -492,3 +498,27 @@ async def test_not_recording_async(self): self.assertFalse(mock_span.set_status.called) PsycopgInstrumentor().uninstrument() + + async def test_tracing_is_async(self): + PsycopgInstrumentor().instrument() + + # before this async fix cursor.execute would take 14000 ns, delaying for + # 100,000ns + delay = 0.0001 + + async def test_async_connection(): + acnx = await psycopg.AsyncConnection.connect("test") + async with acnx as cnx: + async with cnx.cursor() as cursor: + await cursor.execute("SELECT * FROM test", delay=delay) + + await test_async_connection() + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # duration is nanoseconds + duration = span.end_time - span.start_time + self.assertGreater(duration, delay * 1e9) + + PsycopgInstrumentor().uninstrument() From f9405565baedebca37fd424b4085a09d0af177a8 Mon Sep 17 00:00:00 2001 From: Islam Sharabash Date: Wed, 9 Jul 2025 11:16:10 -0700 Subject: [PATCH 3/4] Make return type of query_method an awaitable --- .../src/opentelemetry/instrumentation/psycopg/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 19f3576c32..51d3f87d52 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -140,7 +140,7 @@ from __future__ import annotations import logging -from typing import Any, Callable, Collection, TypeVar +from typing import Any, Awaitable, Callable, Collection, TypeVar import psycopg # pylint: disable=import-self from psycopg.sql import Composed # pylint: disable=no-name-in-module @@ -385,7 +385,7 @@ class AsyncCursorTracer(CursorTracer): async def traced_execution( self, cursor: dbapi.CursorT, - query_method: Callable[..., Any], + query_method: Callable[..., Awaitable[Any]], *args: tuple[Any, ...], **kwargs: dict[Any, Any], ): From cc08874c4e18f34808ee934bdb1604f8cd2a614f Mon Sep 17 00:00:00 2001 From: Islam Sharabash Date: Mon, 21 Jul 2025 12:19:29 -0700 Subject: [PATCH 4/4] Move implementation to traced_execution_async --- .../instrumentation/dbapi/__init__.py | 40 +++++++++++++- .../instrumentation/psycopg/__init__.py | 52 +++---------------- 2 files changed, 45 insertions(+), 47 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py index c7b1dee3b2..e5f3843d74 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py @@ -42,7 +42,7 @@ import functools import logging import re -from typing import Any, Callable, Generic, TypeVar +from typing import Any, Awaitable, Callable, Generic, TypeVar import wrapt from wrapt import wrap_function_wrapper @@ -587,6 +587,44 @@ def traced_execution( self._populate_span(span, cursor, *args) return query_method(*args, **kwargs) + async def traced_execution_async( + self, + cursor: CursorT, + query_method: Callable[..., Awaitable[Any]], + *args: tuple[Any, ...], + **kwargs: dict[Any, Any], + ): + name = self.get_operation_name(cursor, args) + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) + + with self._db_api_integration._tracer.start_as_current_span( + name, kind=SpanKind.CLIENT + ) as span: + if span.is_recording(): + if args and self._commenter_enabled: + if self._enable_attribute_commenter: + # sqlcomment is added to executed query and db.statement span attribute + args = self._update_args_with_added_sql_comment( + args, cursor + ) + self._populate_span(span, cursor, *args) + else: + # sqlcomment is only added to executed query + # so db.statement is set before add_sql_comment + self._populate_span(span, cursor, *args) + args = self._update_args_with_added_sql_comment( + args, cursor + ) + else: + # no sqlcomment anywhere + self._populate_span(span, cursor, *args) + return await query_method(*args, **kwargs) + # pylint: disable=abstract-method class TracedCursorProxy(wrapt.ObjectProxy, Generic[CursorT]): diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 51d3f87d52..ede035e079 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -140,7 +140,7 @@ from __future__ import annotations import logging -from typing import Any, Awaitable, Callable, Collection, TypeVar +from typing import Any, Callable, Collection, TypeVar import psycopg # pylint: disable=import-self from psycopg.sql import Composed # pylint: disable=no-name-in-module @@ -149,7 +149,7 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg.package import _instruments from opentelemetry.instrumentation.psycopg.version import __version__ -from opentelemetry.trace import SpanKind, TracerProvider +from opentelemetry.trace import TracerProvider _logger = logging.getLogger(__name__) _OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" @@ -381,46 +381,6 @@ def callproc(self, *args: Any, **kwargs: Any): return TracedCursorFactory -class AsyncCursorTracer(CursorTracer): - async def traced_execution( - self, - cursor: dbapi.CursorT, - query_method: Callable[..., Awaitable[Any]], - *args: tuple[Any, ...], - **kwargs: dict[Any, Any], - ): - name = self.get_operation_name(cursor, args) - if not name: - name = ( - self._db_api_integration.database - if self._db_api_integration.database - else self._db_api_integration.name - ) - - with self._db_api_integration._tracer.start_as_current_span( - name, kind=SpanKind.CLIENT - ) as span: - if span.is_recording(): - if args and self._commenter_enabled: - if self._enable_attribute_commenter: - # sqlcomment is added to executed query and db.statement span attribute - args = self._update_args_with_added_sql_comment( - args, cursor - ) - self._populate_span(span, cursor, *args) - else: - # sqlcomment is only added to executed query - # so db.statement is set before add_sql_comment - self._populate_span(span, cursor, *args) - args = self._update_args_with_added_sql_comment( - args, cursor - ) - else: - # no sqlcomment anywhere - self._populate_span(span, cursor, *args) - return await query_method(*args, **kwargs) - - def _new_cursor_async_factory( db_api: DatabaseApiAsyncIntegration | None = None, base_factory: type[psycopg.AsyncCursor] | None = None, @@ -435,21 +395,21 @@ def _new_cursor_async_factory( tracer_provider=tracer_provider, ) base_factory = base_factory or psycopg.AsyncCursor - _cursor_tracer = AsyncCursorTracer(db_api) + _cursor_tracer = CursorTracer(db_api) class TracedCursorAsyncFactory(base_factory): async def execute(self, *args: Any, **kwargs: Any): - return await _cursor_tracer.traced_execution( + return await _cursor_tracer.traced_execution_async( self, super().execute, *args, **kwargs ) async def executemany(self, *args: Any, **kwargs: Any): - return await _cursor_tracer.traced_execution( + return await _cursor_tracer.traced_execution_async( self, super().executemany, *args, **kwargs ) async def callproc(self, *args: Any, **kwargs: Any): - return await _cursor_tracer.traced_execution( + return await _cursor_tracer.traced_execution_async( self, super().callproc, *args, **kwargs )