Skip to content

Commit 6b8142b

Browse files
authored
Merge branch 'main' into botocore-bedrock-system-events
2 parents 7dc907e + 1623dc0 commit 6b8142b

File tree

3 files changed

+203
-41
lines changed

3 files changed

+203
-41
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2222

2323
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
2424
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
25+
- `opentelemetry-instrumentation-asyncpg` Fix fallback for empty queries.
26+
([#3253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3253))
2527

2628
## Version 1.30.0/0.51b0 (2025-02-03)
2729

instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ def _uninstrument(self, **__):
150150

151151
async def _do_execute(self, func, instance, args, kwargs):
152152
exception = None
153-
params = getattr(instance, "_params", {})
154-
name = args[0] if args[0] else params.get("database", "postgresql")
153+
params = getattr(instance, "_params", None)
154+
name = (
155+
args[0] if args[0] else getattr(params, "database", "postgresql")
156+
)
155157

156158
try:
157159
# Strip leading comments so we get the operation name.
@@ -185,11 +187,11 @@ async def _do_execute(self, func, instance, args, kwargs):
185187
async def _do_cursor_execute(self, func, instance, args, kwargs):
186188
"""Wrap cursor based functions. For every call this will generate a new span."""
187189
exception = None
188-
params = getattr(instance._connection, "_params", {})
190+
params = getattr(instance._connection, "_params", None)
189191
name = (
190192
instance._query
191193
if instance._query
192-
else params.get("database", "postgresql")
194+
else getattr(params, "database", "postgresql")
193195
)
194196

195197
try:

tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py

Lines changed: 195 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
22
import os
3+
from collections import namedtuple
4+
from unittest.mock import patch
35

46
import asyncpg
57

@@ -20,7 +22,26 @@ def async_call(coro):
2022
return loop.run_until_complete(coro)
2123

2224

23-
class TestFunctionalAsyncPG(TestBase):
25+
class CheckSpanMixin:
26+
def check_span(self, span, expected_db_name=POSTGRES_DB_NAME):
27+
self.assertEqual(
28+
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
29+
)
30+
self.assertEqual(
31+
span.attributes[SpanAttributes.DB_NAME], expected_db_name
32+
)
33+
self.assertEqual(
34+
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
35+
)
36+
self.assertEqual(
37+
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
38+
)
39+
self.assertEqual(
40+
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
41+
)
42+
43+
44+
class TestFunctionalAsyncPG(TestBase, CheckSpanMixin):
2445
def setUp(self):
2546
super().setUp()
2647
self._tracer = self.tracer_provider.get_tracer(__name__)
@@ -39,25 +60,54 @@ def tearDown(self):
3960
AsyncPGInstrumentor().uninstrument()
4061
super().tearDown()
4162

42-
def check_span(self, span):
43-
self.assertEqual(
44-
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
45-
)
46-
self.assertEqual(
47-
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
48-
)
49-
self.assertEqual(
50-
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
51-
)
63+
def test_instrumented_execute_method_without_arguments(self, *_, **__):
64+
"""Should create a span for execute()."""
65+
async_call(self._connection.execute("SELECT 42;"))
66+
spans = self.memory_exporter.get_finished_spans()
67+
self.assertEqual(len(spans), 1)
68+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
69+
self.check_span(spans[0])
70+
self.assertEqual(spans[0].name, "SELECT")
5271
self.assertEqual(
53-
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
72+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
5473
)
74+
75+
def test_instrumented_execute_method_error(self, *_, **__):
76+
"""Should create an error span for execute() with the database name as the span name."""
77+
with self.assertRaises(AttributeError):
78+
async_call(self._connection.execute(""))
79+
spans = self.memory_exporter.get_finished_spans()
80+
self.assertEqual(len(spans), 1)
81+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
82+
self.check_span(spans[0])
83+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
84+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
85+
86+
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
87+
"""Should create a span from fetch()."""
88+
async_call(self._connection.fetch("SELECT 42;"))
89+
spans = self.memory_exporter.get_finished_spans()
90+
self.assertEqual(len(spans), 1)
91+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
92+
self.check_span(spans[0])
93+
self.assertEqual(spans[0].name, "SELECT")
5594
self.assertEqual(
56-
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
95+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
5796
)
5897

59-
def test_instrumented_execute_method_without_arguments(self, *_, **__):
60-
async_call(self._connection.execute("SELECT 42;"))
98+
def test_instrumented_fetch_method_empty_query(self, *_, **__):
99+
"""Should create an error span for fetch() with the database name as the span name."""
100+
async_call(self._connection.fetch(""))
101+
spans = self.memory_exporter.get_finished_spans()
102+
self.assertEqual(len(spans), 1)
103+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
104+
self.check_span(spans[0])
105+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
106+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
107+
108+
def test_instrumented_fetchval_method_without_arguments(self, *_, **__):
109+
"""Should create a span for fetchval()."""
110+
async_call(self._connection.fetchval("SELECT 42;"))
61111
spans = self.memory_exporter.get_finished_spans()
62112
self.assertEqual(len(spans), 1)
63113
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
@@ -67,17 +117,105 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
67117
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
68118
)
69119

70-
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
71-
async_call(self._connection.fetch("SELECT 42;"))
120+
def test_instrumented_fetchval_method_empty_query(self, *_, **__):
121+
"""Should create an error span for fetchval() with the database name as the span name."""
122+
async_call(self._connection.fetchval(""))
72123
spans = self.memory_exporter.get_finished_spans()
73124
self.assertEqual(len(spans), 1)
125+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
126+
self.check_span(spans[0])
127+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
128+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
129+
130+
def test_instrumented_fetchrow_method_without_arguments(self, *_, **__):
131+
"""Should create a span for fetchrow()."""
132+
async_call(self._connection.fetchrow("SELECT 42;"))
133+
spans = self.memory_exporter.get_finished_spans()
134+
self.assertEqual(len(spans), 1)
135+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
74136
self.check_span(spans[0])
75137
self.assertEqual(spans[0].name, "SELECT")
76138
self.assertEqual(
77139
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
78140
)
79141

142+
def test_instrumented_fetchrow_method_empty_query(self, *_, **__):
143+
"""Should create an error span for fetchrow() with the database name as the span name."""
144+
async_call(self._connection.fetchrow(""))
145+
spans = self.memory_exporter.get_finished_spans()
146+
self.assertEqual(len(spans), 1)
147+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
148+
self.check_span(spans[0])
149+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
150+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
151+
152+
def test_instrumented_cursor_execute_method_without_arguments(
153+
self, *_, **__
154+
):
155+
"""Should create spans for the transaction as well as the cursor fetches."""
156+
157+
async def _cursor_execute():
158+
async with self._connection.transaction():
159+
async for record in self._connection.cursor(
160+
"SELECT generate_series(0, 5);"
161+
):
162+
pass
163+
164+
async_call(_cursor_execute())
165+
spans = self.memory_exporter.get_finished_spans()
166+
167+
self.check_span(spans[0])
168+
self.assertEqual(spans[0].name, "BEGIN;")
169+
self.assertEqual(
170+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
171+
)
172+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
173+
174+
for span in spans[1:-1]:
175+
self.check_span(span)
176+
self.assertEqual(span.name, "CURSOR: SELECT")
177+
self.assertEqual(
178+
span.attributes[SpanAttributes.DB_STATEMENT],
179+
"SELECT generate_series(0, 5);",
180+
)
181+
self.assertIs(StatusCode.UNSET, span.status.status_code)
182+
183+
self.check_span(spans[-1])
184+
self.assertEqual(spans[-1].name, "COMMIT;")
185+
self.assertEqual(
186+
spans[-1].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
187+
)
188+
189+
def test_instrumented_cursor_execute_method_empty_query(self, *_, **__):
190+
"""Should create spans for the transaction and cursor fetches with the database name as the span name."""
191+
192+
async def _cursor_execute():
193+
async with self._connection.transaction():
194+
async for record in self._connection.cursor(""):
195+
pass
196+
197+
async_call(_cursor_execute())
198+
spans = self.memory_exporter.get_finished_spans()
199+
self.assertEqual(len(spans), 3)
200+
201+
self.check_span(spans[0])
202+
self.assertEqual(
203+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
204+
)
205+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
206+
207+
self.check_span(spans[1])
208+
self.assertEqual(spans[1].name, f"CURSOR: {POSTGRES_DB_NAME}")
209+
self.assertEqual(spans[1].attributes[SpanAttributes.DB_STATEMENT], "")
210+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
211+
212+
self.check_span(spans[2])
213+
self.assertEqual(
214+
spans[2].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
215+
)
216+
80217
def test_instrumented_remove_comments(self, *_, **__):
218+
"""Should remove comments from the query and set the span name correctly."""
81219
async_call(self._connection.fetch("/* leading comment */ SELECT 42;"))
82220
async_call(
83221
self._connection.fetch(
@@ -88,25 +226,30 @@ def test_instrumented_remove_comments(self, *_, **__):
88226
spans = self.memory_exporter.get_finished_spans()
89227
self.assertEqual(len(spans), 3)
90228
self.check_span(spans[0])
229+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
91230
self.assertEqual(spans[0].name, "SELECT")
92231
self.assertEqual(
93232
spans[0].attributes[SpanAttributes.DB_STATEMENT],
94233
"/* leading comment */ SELECT 42;",
95234
)
96235
self.check_span(spans[1])
236+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
97237
self.assertEqual(spans[1].name, "SELECT")
98238
self.assertEqual(
99239
spans[1].attributes[SpanAttributes.DB_STATEMENT],
100240
"/* leading comment */ SELECT 42; /* trailing comment */",
101241
)
102242
self.check_span(spans[2])
243+
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
103244
self.assertEqual(spans[2].name, "SELECT")
104245
self.assertEqual(
105246
spans[2].attributes[SpanAttributes.DB_STATEMENT],
106247
"SELECT 42; /* trailing comment */",
107248
)
108249

109250
def test_instrumented_transaction_method(self, *_, **__):
251+
"""Should create spans for the transaction and the inner execute()."""
252+
110253
async def _transaction_execute():
111254
async with self._connection.transaction():
112255
await self._connection.execute("SELECT 42;")
@@ -134,6 +277,8 @@ async def _transaction_execute():
134277
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
135278

136279
def test_instrumented_failed_transaction_method(self, *_, **__):
280+
"""Should create spans for the transaction as well as an error span for execute()."""
281+
137282
async def _transaction_execute():
138283
async with self._connection.transaction():
139284
await self._connection.execute("SELECT 42::uuid;")
@@ -164,6 +309,7 @@ async def _transaction_execute():
164309
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
165310

166311
def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
312+
"""Should not capture parameters when capture_parameters is False."""
167313
async_call(self._connection.execute("SELECT $1;", "1"))
168314
spans = self.memory_exporter.get_finished_spans()
169315
self.assertEqual(len(spans), 1)
@@ -174,7 +320,7 @@ def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
174320
)
175321

176322

177-
class TestFunctionalAsyncPG_CaptureParameters(TestBase):
323+
class TestFunctionalAsyncPG_CaptureParameters(TestBase, CheckSpanMixin):
178324
def setUp(self):
179325
super().setUp()
180326
self._tracer = self.tracer_provider.get_tracer(__name__)
@@ -195,24 +341,8 @@ def tearDown(self):
195341
AsyncPGInstrumentor().uninstrument()
196342
super().tearDown()
197343

198-
def check_span(self, span):
199-
self.assertEqual(
200-
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
201-
)
202-
self.assertEqual(
203-
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
204-
)
205-
self.assertEqual(
206-
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
207-
)
208-
self.assertEqual(
209-
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
210-
)
211-
self.assertEqual(
212-
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
213-
)
214-
215344
def test_instrumented_execute_method_with_arguments(self, *_, **__):
345+
"""Should create a span for execute() with captured parameters."""
216346
async_call(self._connection.execute("SELECT $1;", "1"))
217347
spans = self.memory_exporter.get_finished_spans()
218348
self.assertEqual(len(spans), 1)
@@ -228,6 +358,7 @@ def test_instrumented_execute_method_with_arguments(self, *_, **__):
228358
)
229359

230360
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
361+
"""Should create a span for fetch() with captured parameters."""
231362
async_call(self._connection.fetch("SELECT $1;", "1"))
232363
spans = self.memory_exporter.get_finished_spans()
233364
self.assertEqual(len(spans), 1)
@@ -242,10 +373,11 @@ def test_instrumented_fetch_method_with_arguments(self, *_, **__):
242373
)
243374

244375
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
376+
"""Should create a span for executemany with captured parameters."""
245377
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
246378
spans = self.memory_exporter.get_finished_spans()
247379
self.assertEqual(len(spans), 1)
248-
380+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
249381
self.check_span(spans[0])
250382
self.assertEqual(
251383
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT $1;"
@@ -255,15 +387,41 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
255387
)
256388

257389
def test_instrumented_execute_interface_error_method(self, *_, **__):
390+
"""Should create an error span for execute() with captured parameters."""
258391
with self.assertRaises(asyncpg.InterfaceError):
259392
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
260393
spans = self.memory_exporter.get_finished_spans()
261394
self.assertEqual(len(spans), 1)
262-
395+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
263396
self.check_span(spans[0])
264397
self.assertEqual(
265398
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
266399
)
267400
self.assertEqual(
268401
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
269402
)
403+
404+
def test_instrumented_executemany_method_empty_query(self, *_, **__):
405+
"""Should create a span for executemany() with captured parameters."""
406+
async_call(self._connection.executemany("", []))
407+
spans = self.memory_exporter.get_finished_spans()
408+
self.assertEqual(len(spans), 1)
409+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
410+
self.check_span(spans[0])
411+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
412+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
413+
self.assertEqual(
414+
spans[0].attributes["db.statement.parameters"], "([],)"
415+
)
416+
417+
def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__):
418+
"""Should create a span for fetch() with "postgresql" as the span name."""
419+
with patch.object(
420+
self._connection, "_params", namedtuple("ConnectionParams", [])
421+
):
422+
async_call(self._connection.fetch(""))
423+
spans = self.memory_exporter.get_finished_spans()
424+
self.assertEqual(len(spans), 1)
425+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
426+
self.assertEqual(spans[0].name, "postgresql")
427+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

0 commit comments

Comments
 (0)