Skip to content

Commit 0059e86

Browse files
committed
fixed: asyncpg connection params are a namedtuple
Follow-up on the apparently abbandonned #2114. The asyncpg instrumentation attempts to fall back on using the database name as the span name in case the first argument to the instrumented method is falsey. This has probably never worked since asyncpg defines the `_params` attribute as an instance of `ConnectionParams` (https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L62) which is a NamedTuple instance and thus don't define `get`. The proper way of safely accessing properties on a NamedTuple is using `getattr`. The only case that I've actually found which triggers this branch is if the supplied query is an empty string. This is something that causes an `AttributeError` for `Connection.execute` but is fine for `fetch()`, `fetchval()`, `fetchrow()` and `executemany()`. The tests have been expanded to check these cases. Also, more status code validation has been added where it was missing.
1 parent c0bc2c9 commit 0059e86

File tree

3 files changed

+146
-6
lines changed

3 files changed

+146
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
### Fixed
1515
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
1616
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
17+
- `opentelemetry-instrumentation-asyncpg` Fix fallback for empty queries.
18+
([#3253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3253))
1719

1820
## Version 1.30.0/0.51b0 (2025-02-03)
1921

instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ def _uninstrument(self, **__):
150150

151151
async def _do_execute(self, func, instance, args, kwargs):
152152
exception = None
153-
params = getattr(instance, "_params", {})
154-
name = args[0] if args[0] else params.get("database", "postgresql")
153+
params = getattr(instance, "_params", None)
154+
name = (
155+
args[0] if args[0] else getattr(params, "database", "postgresql")
156+
)
155157

156158
try:
157159
# Strip leading comments so we get the operation name.
@@ -185,11 +187,11 @@ async def _do_execute(self, func, instance, args, kwargs):
185187
async def _do_cursor_execute(self, func, instance, args, kwargs):
186188
"""Wrap cursor based functions. For every call this will generate a new span."""
187189
exception = None
188-
params = getattr(instance._connection, "_params", {})
190+
params = getattr(instance._connection, "_params", None)
189191
name = (
190192
instance._query
191193
if instance._query
192-
else params.get("database", "postgresql")
194+
else getattr(params, "database", "postgresql")
193195
)
194196

195197
try:

tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,137 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
6767
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
6868
)
6969

70+
def test_instrumented_execute_method_error(self, *_, **__):
71+
with self.assertRaises(AttributeError):
72+
async_call(self._connection.execute(""))
73+
spans = self.memory_exporter.get_finished_spans()
74+
self.assertEqual(len(spans), 1)
75+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
76+
self.check_span(spans[0])
77+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
78+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
79+
7080
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
7181
async_call(self._connection.fetch("SELECT 42;"))
7282
spans = self.memory_exporter.get_finished_spans()
7383
self.assertEqual(len(spans), 1)
84+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
85+
self.check_span(spans[0])
86+
self.assertEqual(spans[0].name, "SELECT")
87+
self.assertEqual(
88+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
89+
)
90+
91+
def test_instrumented_fetch_method_empty_query(self, *_, **__):
92+
async_call(self._connection.fetch(""))
93+
spans = self.memory_exporter.get_finished_spans()
94+
self.assertEqual(len(spans), 1)
95+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
96+
self.check_span(spans[0])
97+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
98+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
99+
100+
def test_instrumented_fetchval_method_without_arguments(self, *_, **__):
101+
async_call(self._connection.fetchval("SELECT 42;"))
102+
spans = self.memory_exporter.get_finished_spans()
103+
self.assertEqual(len(spans), 1)
104+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
74105
self.check_span(spans[0])
75106
self.assertEqual(spans[0].name, "SELECT")
76107
self.assertEqual(
77108
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
78109
)
79110

111+
def test_instrumented_fetchval_method_empty_query(self, *_, **__):
112+
async_call(self._connection.fetchval(""))
113+
spans = self.memory_exporter.get_finished_spans()
114+
self.assertEqual(len(spans), 1)
115+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
116+
self.check_span(spans[0])
117+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
118+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
119+
120+
def test_instrumented_fetchrow_method_without_arguments(self, *_, **__):
121+
async_call(self._connection.fetchval("SELECT 42;"))
122+
spans = self.memory_exporter.get_finished_spans()
123+
self.assertEqual(len(spans), 1)
124+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
125+
self.check_span(spans[0])
126+
self.assertEqual(spans[0].name, "SELECT")
127+
self.assertEqual(
128+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
129+
)
130+
131+
def test_instrumented_fetchrow_method_empty_query(self, *_, **__):
132+
async_call(self._connection.fetchrow(""))
133+
spans = self.memory_exporter.get_finished_spans()
134+
self.assertEqual(len(spans), 1)
135+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
136+
self.check_span(spans[0])
137+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
138+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
139+
140+
def test_instrumented_cursor_execute_method_without_arguments(
141+
self, *_, **__
142+
):
143+
async def _cursor_execute():
144+
async with self._connection.transaction():
145+
async for record in self._connection.cursor(
146+
"SELECT generate_series(0, 5);"
147+
):
148+
pass
149+
150+
async_call(_cursor_execute())
151+
spans = self.memory_exporter.get_finished_spans()
152+
153+
self.check_span(spans[0])
154+
self.assertEqual(spans[0].name, "BEGIN;")
155+
self.assertEqual(
156+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
157+
)
158+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
159+
160+
for span in spans[1:-1]:
161+
self.check_span(span)
162+
self.assertEqual(span.name, "CURSOR: SELECT")
163+
self.assertEqual(
164+
span.attributes[SpanAttributes.DB_STATEMENT],
165+
"SELECT generate_series(0, 5);",
166+
)
167+
self.assertIs(StatusCode.UNSET, span.status.status_code)
168+
169+
self.check_span(spans[-1])
170+
self.assertEqual(spans[-1].name, "COMMIT;")
171+
self.assertEqual(
172+
spans[-1].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
173+
)
174+
175+
def test_instrumented_cursor_execute_method_empty_query(self, *_, **__):
176+
async def _cursor_execute():
177+
async with self._connection.transaction():
178+
async for record in self._connection.cursor(""):
179+
pass
180+
181+
async_call(_cursor_execute())
182+
spans = self.memory_exporter.get_finished_spans()
183+
self.assertEqual(len(spans), 3)
184+
185+
self.check_span(spans[0])
186+
self.assertEqual(
187+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
188+
)
189+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
190+
191+
self.check_span(spans[1])
192+
self.assertEqual(spans[1].name, f"CURSOR: {POSTGRES_DB_NAME}")
193+
self.assertEqual(spans[1].attributes[SpanAttributes.DB_STATEMENT], "")
194+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
195+
196+
self.check_span(spans[2])
197+
self.assertEqual(
198+
spans[2].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
199+
)
200+
80201
def test_instrumented_remove_comments(self, *_, **__):
81202
async_call(self._connection.fetch("/* leading comment */ SELECT 42;"))
82203
async_call(
@@ -88,18 +209,21 @@ def test_instrumented_remove_comments(self, *_, **__):
88209
spans = self.memory_exporter.get_finished_spans()
89210
self.assertEqual(len(spans), 3)
90211
self.check_span(spans[0])
212+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
91213
self.assertEqual(spans[0].name, "SELECT")
92214
self.assertEqual(
93215
spans[0].attributes[SpanAttributes.DB_STATEMENT],
94216
"/* leading comment */ SELECT 42;",
95217
)
96218
self.check_span(spans[1])
219+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
97220
self.assertEqual(spans[1].name, "SELECT")
98221
self.assertEqual(
99222
spans[1].attributes[SpanAttributes.DB_STATEMENT],
100223
"/* leading comment */ SELECT 42; /* trailing comment */",
101224
)
102225
self.check_span(spans[2])
226+
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
103227
self.assertEqual(spans[2].name, "SELECT")
104228
self.assertEqual(
105229
spans[2].attributes[SpanAttributes.DB_STATEMENT],
@@ -245,7 +369,7 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
245369
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
246370
spans = self.memory_exporter.get_finished_spans()
247371
self.assertEqual(len(spans), 1)
248-
372+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
249373
self.check_span(spans[0])
250374
self.assertEqual(
251375
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT $1;"
@@ -259,11 +383,23 @@ def test_instrumented_execute_interface_error_method(self, *_, **__):
259383
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
260384
spans = self.memory_exporter.get_finished_spans()
261385
self.assertEqual(len(spans), 1)
262-
386+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
263387
self.check_span(spans[0])
264388
self.assertEqual(
265389
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
266390
)
267391
self.assertEqual(
268392
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
269393
)
394+
395+
def test_instrumented_executemany_method_empty_query(self, *_, **__):
396+
async_call(self._connection.executemany("", []))
397+
spans = self.memory_exporter.get_finished_spans()
398+
self.assertEqual(len(spans), 1)
399+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
400+
self.check_span(spans[0])
401+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
402+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
403+
self.assertEqual(
404+
spans[0].attributes["db.statement.parameters"], "([],)"
405+
)

0 commit comments

Comments
 (0)