Skip to content

Commit e6f0404

Browse files
committed
fixed: asyncpg connection params are a namedtuple
Follow-up on the apparently abbandonned #2114. The asyncpg instrumentation attempts to fall back on using the database name as the span name in case the first argument to the instrumented method is falsey. This has probably never worked since asyncpg defines the `_params` attribute as an instance of `ConnectionParams` (https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L62) which is a NamedTuple instance and thus don't define `get`. The proper way of safely accessing properties on a NamedTuple is using `getattr`. The only case that I've actually found which triggers this branch is if the supplied query is an empty string. This is something that causes an `AttributeError` for `Connection.execute` but is fine for `fetch()`, `fetchval()`, `fetchrow()` and `executemany()`. The tests have been expanded to check these cases. Also, more status code validation has been added where it was missing.
1 parent c0bc2c9 commit e6f0404

File tree

3 files changed

+145
-6
lines changed

3 files changed

+145
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Fixed
1515
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
1616
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
17+
- `opentelemetry-instrumentation-asyncpg` Fix fallback for empty queries.
18+
([#3253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3253))
1719

1820
## Version 1.30.0/0.51b0 (2025-02-03)
1921

instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ def _uninstrument(self, **__):
150150

151151
async def _do_execute(self, func, instance, args, kwargs):
152152
exception = None
153-
params = getattr(instance, "_params", {})
154-
name = args[0] if args[0] else params.get("database", "postgresql")
153+
params = getattr(instance, "_params", None)
154+
name = (
155+
args[0] if args[0] else getattr(params, "database", "postgresql")
156+
)
155157

156158
try:
157159
# Strip leading comments so we get the operation name.
@@ -185,11 +187,11 @@ async def _do_execute(self, func, instance, args, kwargs):
185187
async def _do_cursor_execute(self, func, instance, args, kwargs):
186188
"""Wrap cursor based functions. For every call this will generate a new span."""
187189
exception = None
188-
params = getattr(instance._connection, "_params", {})
190+
params = getattr(instance._connection, "_params", None)
189191
name = (
190192
instance._query
191193
if instance._query
192-
else params.get("database", "postgresql")
194+
else getattr(params, "database", "postgresql")
193195
)
194196

195197
try:

tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,136 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
6767
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
6868
)
6969

70+
def test_instrumented_execute_method_error(self, *_, **__):
71+
with self.assertRaises(AttributeError):
72+
async_call(self._connection.execute(""))
73+
spans = self.memory_exporter.get_finished_spans()
74+
self.assertEqual(len(spans), 1)
75+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
76+
self.check_span(spans[0])
77+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
78+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
79+
7080
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
7181
async_call(self._connection.fetch("SELECT 42;"))
7282
spans = self.memory_exporter.get_finished_spans()
7383
self.assertEqual(len(spans), 1)
84+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
85+
self.check_span(spans[0])
86+
self.assertEqual(spans[0].name, "SELECT")
87+
self.assertEqual(
88+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
89+
)
90+
91+
def test_instrumented_fetch_method_empty_query(self, *_, **__):
92+
async_call(self._connection.fetch(""))
93+
spans = self.memory_exporter.get_finished_spans()
94+
self.assertEqual(len(spans), 1)
95+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
96+
self.check_span(spans[0])
97+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
98+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
99+
100+
def test_instrumented_fetchval_method_without_arguments(self, *_, **__):
101+
async_call(self._connection.fetchval("SELECT 42;"))
102+
spans = self.memory_exporter.get_finished_spans()
103+
self.assertEqual(len(spans), 1)
104+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
74105
self.check_span(spans[0])
75106
self.assertEqual(spans[0].name, "SELECT")
76107
self.assertEqual(
77108
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
78109
)
79110

111+
def test_instrumented_fetchval_method_empty_query(self, *_, **__):
112+
async_call(self._connection.fetchval(""))
113+
spans = self.memory_exporter.get_finished_spans()
114+
self.assertEqual(len(spans), 1)
115+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
116+
self.check_span(spans[0])
117+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
118+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
119+
120+
def test_instrumented_fetchrow_method_without_arguments(self, *_, **__):
121+
async_call(self._connection.fetchval("SELECT 42;"))
122+
spans = self.memory_exporter.get_finished_spans()
123+
self.assertEqual(len(spans), 1)
124+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
125+
self.check_span(spans[0])
126+
self.assertEqual(spans[0].name, "SELECT")
127+
self.assertEqual(
128+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
129+
)
130+
131+
def test_instrumented_fetchrow_method_empty_query(self, *_, **__):
132+
async_call(self._connection.fetchrow(""))
133+
spans = self.memory_exporter.get_finished_spans()
134+
self.assertEqual(len(spans), 1)
135+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
136+
self.check_span(spans[0])
137+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
138+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
139+
140+
def test_instrumented_cursor_execute_method_without_arguments(
141+
self, *_, **__
142+
):
143+
async def _cursor_execute():
144+
async with self._connection.transaction():
145+
async for record in self._connection.cursor(
146+
"SELECT generate_series(0, 5);"
147+
):
148+
pass
149+
150+
async_call(_cursor_execute())
151+
spans = self.memory_exporter.get_finished_spans()
152+
153+
self.check_span(spans[0])
154+
self.assertEqual(spans[0].name, "BEGIN;")
155+
self.assertEqual(
156+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
157+
)
158+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
159+
160+
for span in spans[1:-1]:
161+
self.check_span(span)
162+
self.assertEqual(span.name, "CURSOR: SELECT")
163+
self.assertEqual(
164+
span.attributes[SpanAttributes.DB_STATEMENT],
165+
"SELECT generate_series(0, 5);",
166+
)
167+
self.assertIs(StatusCode.UNSET, span.status.status_code)
168+
169+
self.check_span(spans[-1])
170+
self.assertEqual(spans[-1].name, "COMMIT;")
171+
self.assertEqual(
172+
spans[-1].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
173+
)
174+
175+
def test_instrumented_cursor_execute_method_empty_query(self, *_, **__):
176+
async def _cursor_execute():
177+
async with self._connection.transaction():
178+
async for record in self._connection.cursor(""):
179+
pass
180+
181+
async_call(_cursor_execute())
182+
spans = self.memory_exporter.get_finished_spans()
183+
self.assertEqual(len(spans), 3)
184+
185+
self.check_span(spans[0])
186+
self.assertEqual(
187+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
188+
)
189+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
190+
191+
self.check_span(spans[1])
192+
self.assertEqual(spans[1].attributes[SpanAttributes.DB_STATEMENT], "")
193+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
194+
195+
self.check_span(spans[2])
196+
self.assertEqual(
197+
spans[2].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
198+
)
199+
80200
def test_instrumented_remove_comments(self, *_, **__):
81201
async_call(self._connection.fetch("/* leading comment */ SELECT 42;"))
82202
async_call(
@@ -88,18 +208,21 @@ def test_instrumented_remove_comments(self, *_, **__):
88208
spans = self.memory_exporter.get_finished_spans()
89209
self.assertEqual(len(spans), 3)
90210
self.check_span(spans[0])
211+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
91212
self.assertEqual(spans[0].name, "SELECT")
92213
self.assertEqual(
93214
spans[0].attributes[SpanAttributes.DB_STATEMENT],
94215
"/* leading comment */ SELECT 42;",
95216
)
96217
self.check_span(spans[1])
218+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
97219
self.assertEqual(spans[1].name, "SELECT")
98220
self.assertEqual(
99221
spans[1].attributes[SpanAttributes.DB_STATEMENT],
100222
"/* leading comment */ SELECT 42; /* trailing comment */",
101223
)
102224
self.check_span(spans[2])
225+
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
103226
self.assertEqual(spans[2].name, "SELECT")
104227
self.assertEqual(
105228
spans[2].attributes[SpanAttributes.DB_STATEMENT],
@@ -245,7 +368,7 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
245368
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
246369
spans = self.memory_exporter.get_finished_spans()
247370
self.assertEqual(len(spans), 1)
248-
371+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
249372
self.check_span(spans[0])
250373
self.assertEqual(
251374
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT $1;"
@@ -259,11 +382,23 @@ def test_instrumented_execute_interface_error_method(self, *_, **__):
259382
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
260383
spans = self.memory_exporter.get_finished_spans()
261384
self.assertEqual(len(spans), 1)
262-
385+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
263386
self.check_span(spans[0])
264387
self.assertEqual(
265388
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
266389
)
267390
self.assertEqual(
268391
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
269392
)
393+
394+
def test_instrumented_executemany_method_empty_query(self, *_, **__):
395+
async_call(self._connection.executemany("", []))
396+
spans = self.memory_exporter.get_finished_spans()
397+
self.assertEqual(len(spans), 1)
398+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
399+
self.check_span(spans[0])
400+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
401+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
402+
self.assertEqual(
403+
spans[0].attributes["db.statement.parameters"], "([],)"
404+
)

0 commit comments

Comments
 (0)