Skip to content
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- `opentelemetry-instrumentation-asyncpg`: Hydrate span attributes before creation so samplers can filter on database details
([#3643](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3643))

## Version 1.39.0/0.60b0 (2025-12-03)

### Added
Expand Down Expand Up @@ -73,7 +78,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936))
- `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation
([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957))

## Version 1.38.0/0.59b0 (2025-10-16)

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,16 @@ async def _do_execute(self, func, instance, args, kwargs):
except IndexError:
name = ""

# Hydrate attributes before span creation to enable filtering
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)

with self._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
Expand Down Expand Up @@ -217,20 +215,19 @@ async def _do_cursor_execute(self, func, instance, args, kwargs):
except IndexError:
name = ""

# Hydrate attributes before span creation to enable filtering
span_attributes = _hydrate_span_from_args(
instance._connection,
instance._query,
instance._args if self.capture_parameters else None,
)

stop = False
with self._tracer.start_as_current_span(
f"CURSOR: {name}",
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance._connection,
instance._query,
instance._args if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except StopAsyncIteration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@

import asyncpg

from opentelemetry import trace
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import StatusCode
Expand Down Expand Up @@ -425,3 +431,100 @@ def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__):
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(spans[0].name, "postgresql")
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")


class _FakeParams:
def __init__(self, database="testdb", user="dbuser"):
self.database = database
self.user = user


class _FakeConnection:
def __init__(self):
self._params = _FakeParams()
self._addr = ("db.example.com", 5432)


class _FakeCursor:
def __init__(self, connection, query, args):
self._connection = connection
self._query = query
self._args = args


class TestAsyncPGSamplingAttributes(TestBase):
def setUp(self):
super().setUp()
self.connection = _FakeConnection()
self.exporter = InMemorySpanExporter()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(self.exporter))
self.tracer = tracer_provider.get_tracer(__name__)
self.instrumentor = AsyncPGInstrumentor(capture_parameters=True)
self.instrumentor._tracer = self.tracer

def test_attributes_available_when_span_not_recording(self):
async def _fake_execute(*args, **kwargs):
return "ok"

async_call(
self.instrumentor._do_execute(
_fake_execute,
self.connection,
("SELECT $1", "42"),
{},
)
)

spans = self.exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(
span.attributes,
{
SpanAttributes.DB_SYSTEM: "postgresql",
SpanAttributes.DB_NAME: "testdb",
SpanAttributes.DB_USER: "dbuser",
SpanAttributes.NET_PEER_NAME: "db.example.com",
SpanAttributes.NET_PEER_PORT: 5432,
SpanAttributes.NET_TRANSPORT: "ip_tcp",
SpanAttributes.DB_STATEMENT: "SELECT $1",
"db.statement.parameters": "('42',)",
},
)
self.assertEqual(span.kind, trace.SpanKind.CLIENT)
self.assertEqual(span.name, "SELECT")

def test_cursor_attributes_available_when_span_not_recording(self):
async def _fake_cursor_execute(*args, **kwargs):
return "ok"

cursor = _FakeCursor(self.connection, "SELECT $1", ("99",))

async_call(
self.instrumentor._do_cursor_execute(
_fake_cursor_execute,
cursor,
(),
{},
)
)

spans = self.exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(
span.attributes,
{
SpanAttributes.DB_SYSTEM: "postgresql",
SpanAttributes.DB_NAME: "testdb",
SpanAttributes.DB_USER: "dbuser",
SpanAttributes.NET_PEER_NAME: "db.example.com",
SpanAttributes.NET_PEER_PORT: 5432,
SpanAttributes.NET_TRANSPORT: "ip_tcp",
SpanAttributes.DB_STATEMENT: "SELECT $1",
"db.statement.parameters": "('99',)",
},
)
self.assertEqual(span.kind, trace.SpanKind.CLIENT)
self.assertEqual(span.name, "CURSOR: SELECT")