Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg.package import _instruments
from opentelemetry.instrumentation.psycopg.version import __version__
from opentelemetry.trace import TracerProvider
from opentelemetry.trace import SpanKind, TracerProvider

_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
Expand Down Expand Up @@ -381,6 +381,46 @@ def callproc(self, *args: Any, **kwargs: Any):
return TracedCursorFactory


class AsyncCursorTracer(CursorTracer):
async def traced_execution(
self,
cursor: dbapi.CursorT,
query_method: Callable[..., Any],
*args: tuple[Any, ...],
**kwargs: dict[Any, Any],
):
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
if span.is_recording():
if args and self._commenter_enabled:
if self._enable_attribute_commenter:
# sqlcomment is added to executed query and db.statement span attribute
args = self._update_args_with_added_sql_comment(
args, cursor
)
self._populate_span(span, cursor, *args)
else:
# sqlcomment is only added to executed query
# so db.statement is set before add_sql_comment
self._populate_span(span, cursor, *args)
args = self._update_args_with_added_sql_comment(
args, cursor
)
else:
# no sqlcomment anywhere
self._populate_span(span, cursor, *args)
return await query_method(*args, **kwargs)


def _new_cursor_async_factory(
db_api: DatabaseApiAsyncIntegration | None = None,
base_factory: type[psycopg.AsyncCursor] | None = None,
Expand All @@ -395,7 +435,7 @@ def _new_cursor_async_factory(
tracer_provider=tracer_provider,
)
base_factory = base_factory or psycopg.AsyncCursor
_cursor_tracer = CursorTracer(db_api)
_cursor_tracer = AsyncCursorTracer(db_api)

class TracedCursorAsyncFactory(base_factory):
async def execute(self, *args: Any, **kwargs: Any):
Expand Down
Loading