11import asyncio
22import os
3+ from collections import namedtuple
4+ from unittest .mock import patch
35
46import asyncpg
57
@@ -20,7 +22,26 @@ def async_call(coro):
2022 return loop .run_until_complete (coro )
2123
2224
23- class TestFunctionalAsyncPG (TestBase ):
25+ class CheckSpanMixin :
26+ def check_span (self , span , expected_db_name = POSTGRES_DB_NAME ):
27+ self .assertEqual (
28+ span .attributes [SpanAttributes .DB_SYSTEM ], "postgresql"
29+ )
30+ self .assertEqual (
31+ span .attributes [SpanAttributes .DB_NAME ], expected_db_name
32+ )
33+ self .assertEqual (
34+ span .attributes [SpanAttributes .DB_USER ], POSTGRES_USER
35+ )
36+ self .assertEqual (
37+ span .attributes [SpanAttributes .NET_PEER_NAME ], POSTGRES_HOST
38+ )
39+ self .assertEqual (
40+ span .attributes [SpanAttributes .NET_PEER_PORT ], POSTGRES_PORT
41+ )
42+
43+
44+ class TestFunctionalAsyncPG (TestBase , CheckSpanMixin ):
2445 def setUp (self ):
2546 super ().setUp ()
2647 self ._tracer = self .tracer_provider .get_tracer (__name__ )
@@ -39,25 +60,54 @@ def tearDown(self):
3960 AsyncPGInstrumentor ().uninstrument ()
4061 super ().tearDown ()
4162
42- def check_span (self , span ):
43- self .assertEqual (
44- span .attributes [SpanAttributes .DB_SYSTEM ], "postgresql"
45- )
46- self .assertEqual (
47- span .attributes [SpanAttributes .DB_NAME ], POSTGRES_DB_NAME
48- )
49- self .assertEqual (
50- span .attributes [SpanAttributes .DB_USER ], POSTGRES_USER
51- )
63+ def test_instrumented_execute_method_without_arguments (self , * _ , ** __ ):
64+ """Should create a span for execute()."""
65+ async_call (self ._connection .execute ("SELECT 42;" ))
66+ spans = self .memory_exporter .get_finished_spans ()
67+ self .assertEqual (len (spans ), 1 )
68+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
69+ self .check_span (spans [0 ])
70+ self .assertEqual (spans [0 ].name , "SELECT" )
5271 self .assertEqual (
53- span .attributes [SpanAttributes .NET_PEER_NAME ], POSTGRES_HOST
72+ spans [ 0 ] .attributes [SpanAttributes .DB_STATEMENT ], "SELECT 42;"
5473 )
74+
75+ def test_instrumented_execute_method_error (self , * _ , ** __ ):
76+ """Should create an error span for execute() with the database name as the span name."""
77+ with self .assertRaises (AttributeError ):
78+ async_call (self ._connection .execute ("" ))
79+ spans = self .memory_exporter .get_finished_spans ()
80+ self .assertEqual (len (spans ), 1 )
81+ self .assertIs (StatusCode .ERROR , spans [0 ].status .status_code )
82+ self .check_span (spans [0 ])
83+ self .assertEqual (spans [0 ].name , POSTGRES_DB_NAME )
84+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
85+
86+ def test_instrumented_fetch_method_without_arguments (self , * _ , ** __ ):
87+ """Should create a span from fetch()."""
88+ async_call (self ._connection .fetch ("SELECT 42;" ))
89+ spans = self .memory_exporter .get_finished_spans ()
90+ self .assertEqual (len (spans ), 1 )
91+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
92+ self .check_span (spans [0 ])
93+ self .assertEqual (spans [0 ].name , "SELECT" )
5594 self .assertEqual (
56- span .attributes [SpanAttributes .NET_PEER_PORT ], POSTGRES_PORT
95+ spans [ 0 ] .attributes [SpanAttributes .DB_STATEMENT ], "SELECT 42;"
5796 )
5897
59- def test_instrumented_execute_method_without_arguments (self , * _ , ** __ ):
60- async_call (self ._connection .execute ("SELECT 42;" ))
98+ def test_instrumented_fetch_method_empty_query (self , * _ , ** __ ):
99+ """Should create an error span for fetch() with the database name as the span name."""
100+ async_call (self ._connection .fetch ("" ))
101+ spans = self .memory_exporter .get_finished_spans ()
102+ self .assertEqual (len (spans ), 1 )
103+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
104+ self .check_span (spans [0 ])
105+ self .assertEqual (spans [0 ].name , POSTGRES_DB_NAME )
106+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
107+
108+ def test_instrumented_fetchval_method_without_arguments (self , * _ , ** __ ):
109+ """Should create a span for fetchval()."""
110+ async_call (self ._connection .fetchval ("SELECT 42;" ))
61111 spans = self .memory_exporter .get_finished_spans ()
62112 self .assertEqual (len (spans ), 1 )
63113 self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
@@ -67,17 +117,105 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
67117 spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "SELECT 42;"
68118 )
69119
70- def test_instrumented_fetch_method_without_arguments (self , * _ , ** __ ):
71- async_call (self ._connection .fetch ("SELECT 42;" ))
120+ def test_instrumented_fetchval_method_empty_query (self , * _ , ** __ ):
121+ """Should create an error span for fetchval() with the database name as the span name."""
122+ async_call (self ._connection .fetchval ("" ))
72123 spans = self .memory_exporter .get_finished_spans ()
73124 self .assertEqual (len (spans ), 1 )
125+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
126+ self .check_span (spans [0 ])
127+ self .assertEqual (spans [0 ].name , POSTGRES_DB_NAME )
128+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
129+
130+ def test_instrumented_fetchrow_method_without_arguments (self , * _ , ** __ ):
131+ """Should create a span for fetchrow()."""
132+ async_call (self ._connection .fetchrow ("SELECT 42;" ))
133+ spans = self .memory_exporter .get_finished_spans ()
134+ self .assertEqual (len (spans ), 1 )
135+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
74136 self .check_span (spans [0 ])
75137 self .assertEqual (spans [0 ].name , "SELECT" )
76138 self .assertEqual (
77139 spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "SELECT 42;"
78140 )
79141
142+ def test_instrumented_fetchrow_method_empty_query (self , * _ , ** __ ):
143+ """Should create an error span for fetchrow() with the database name as the span name."""
144+ async_call (self ._connection .fetchrow ("" ))
145+ spans = self .memory_exporter .get_finished_spans ()
146+ self .assertEqual (len (spans ), 1 )
147+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
148+ self .check_span (spans [0 ])
149+ self .assertEqual (spans [0 ].name , POSTGRES_DB_NAME )
150+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
151+
152+ def test_instrumented_cursor_execute_method_without_arguments (
153+ self , * _ , ** __
154+ ):
155+ """Should create spans for the transaction as well as the cursor fetches."""
156+
157+ async def _cursor_execute ():
158+ async with self ._connection .transaction ():
159+ async for record in self ._connection .cursor (
160+ "SELECT generate_series(0, 5);"
161+ ):
162+ pass
163+
164+ async_call (_cursor_execute ())
165+ spans = self .memory_exporter .get_finished_spans ()
166+
167+ self .check_span (spans [0 ])
168+ self .assertEqual (spans [0 ].name , "BEGIN;" )
169+ self .assertEqual (
170+ spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "BEGIN;"
171+ )
172+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
173+
174+ for span in spans [1 :- 1 ]:
175+ self .check_span (span )
176+ self .assertEqual (span .name , "CURSOR: SELECT" )
177+ self .assertEqual (
178+ span .attributes [SpanAttributes .DB_STATEMENT ],
179+ "SELECT generate_series(0, 5);" ,
180+ )
181+ self .assertIs (StatusCode .UNSET , span .status .status_code )
182+
183+ self .check_span (spans [- 1 ])
184+ self .assertEqual (spans [- 1 ].name , "COMMIT;" )
185+ self .assertEqual (
186+ spans [- 1 ].attributes [SpanAttributes .DB_STATEMENT ], "COMMIT;"
187+ )
188+
189+ def test_instrumented_cursor_execute_method_empty_query (self , * _ , ** __ ):
190+ """Should create spans for the transaction and cursor fetches with the database name as the span name."""
191+
192+ async def _cursor_execute ():
193+ async with self ._connection .transaction ():
194+ async for record in self ._connection .cursor ("" ):
195+ pass
196+
197+ async_call (_cursor_execute ())
198+ spans = self .memory_exporter .get_finished_spans ()
199+ self .assertEqual (len (spans ), 3 )
200+
201+ self .check_span (spans [0 ])
202+ self .assertEqual (
203+ spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "BEGIN;"
204+ )
205+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
206+
207+ self .check_span (spans [1 ])
208+ self .assertEqual (spans [1 ].name , f"CURSOR: { POSTGRES_DB_NAME } " )
209+ self .assertEqual (spans [1 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
210+ self .assertIs (StatusCode .UNSET , spans [1 ].status .status_code )
211+
212+ self .check_span (spans [2 ])
213+ self .assertEqual (
214+ spans [2 ].attributes [SpanAttributes .DB_STATEMENT ], "COMMIT;"
215+ )
216+
80217 def test_instrumented_remove_comments (self , * _ , ** __ ):
218+ """Should remove comments from the query and set the span name correctly."""
81219 async_call (self ._connection .fetch ("/* leading comment */ SELECT 42;" ))
82220 async_call (
83221 self ._connection .fetch (
@@ -88,25 +226,30 @@ def test_instrumented_remove_comments(self, *_, **__):
88226 spans = self .memory_exporter .get_finished_spans ()
89227 self .assertEqual (len (spans ), 3 )
90228 self .check_span (spans [0 ])
229+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
91230 self .assertEqual (spans [0 ].name , "SELECT" )
92231 self .assertEqual (
93232 spans [0 ].attributes [SpanAttributes .DB_STATEMENT ],
94233 "/* leading comment */ SELECT 42;" ,
95234 )
96235 self .check_span (spans [1 ])
236+ self .assertIs (StatusCode .UNSET , spans [1 ].status .status_code )
97237 self .assertEqual (spans [1 ].name , "SELECT" )
98238 self .assertEqual (
99239 spans [1 ].attributes [SpanAttributes .DB_STATEMENT ],
100240 "/* leading comment */ SELECT 42; /* trailing comment */" ,
101241 )
102242 self .check_span (spans [2 ])
243+ self .assertIs (StatusCode .UNSET , spans [2 ].status .status_code )
103244 self .assertEqual (spans [2 ].name , "SELECT" )
104245 self .assertEqual (
105246 spans [2 ].attributes [SpanAttributes .DB_STATEMENT ],
106247 "SELECT 42; /* trailing comment */" ,
107248 )
108249
109250 def test_instrumented_transaction_method (self , * _ , ** __ ):
251+ """Should create spans for the transaction and the inner execute()."""
252+
110253 async def _transaction_execute ():
111254 async with self ._connection .transaction ():
112255 await self ._connection .execute ("SELECT 42;" )
@@ -134,6 +277,8 @@ async def _transaction_execute():
134277 self .assertIs (StatusCode .UNSET , spans [2 ].status .status_code )
135278
136279 def test_instrumented_failed_transaction_method (self , * _ , ** __ ):
280+ """Should create spans for the transaction as well as an error span for execute()."""
281+
137282 async def _transaction_execute ():
138283 async with self ._connection .transaction ():
139284 await self ._connection .execute ("SELECT 42::uuid;" )
@@ -164,6 +309,7 @@ async def _transaction_execute():
164309 self .assertIs (StatusCode .UNSET , spans [2 ].status .status_code )
165310
166311 def test_instrumented_method_doesnt_capture_parameters (self , * _ , ** __ ):
312+ """Should not capture parameters when capture_parameters is False."""
167313 async_call (self ._connection .execute ("SELECT $1;" , "1" ))
168314 spans = self .memory_exporter .get_finished_spans ()
169315 self .assertEqual (len (spans ), 1 )
@@ -174,7 +320,7 @@ def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
174320 )
175321
176322
177- class TestFunctionalAsyncPG_CaptureParameters (TestBase ):
323+ class TestFunctionalAsyncPG_CaptureParameters (TestBase , CheckSpanMixin ):
178324 def setUp (self ):
179325 super ().setUp ()
180326 self ._tracer = self .tracer_provider .get_tracer (__name__ )
@@ -195,24 +341,8 @@ def tearDown(self):
195341 AsyncPGInstrumentor ().uninstrument ()
196342 super ().tearDown ()
197343
198- def check_span (self , span ):
199- self .assertEqual (
200- span .attributes [SpanAttributes .DB_SYSTEM ], "postgresql"
201- )
202- self .assertEqual (
203- span .attributes [SpanAttributes .DB_NAME ], POSTGRES_DB_NAME
204- )
205- self .assertEqual (
206- span .attributes [SpanAttributes .DB_USER ], POSTGRES_USER
207- )
208- self .assertEqual (
209- span .attributes [SpanAttributes .NET_PEER_NAME ], POSTGRES_HOST
210- )
211- self .assertEqual (
212- span .attributes [SpanAttributes .NET_PEER_PORT ], POSTGRES_PORT
213- )
214-
215344 def test_instrumented_execute_method_with_arguments (self , * _ , ** __ ):
345+ """Should create a span for execute() with captured parameters."""
216346 async_call (self ._connection .execute ("SELECT $1;" , "1" ))
217347 spans = self .memory_exporter .get_finished_spans ()
218348 self .assertEqual (len (spans ), 1 )
@@ -228,6 +358,7 @@ def test_instrumented_execute_method_with_arguments(self, *_, **__):
228358 )
229359
230360 def test_instrumented_fetch_method_with_arguments (self , * _ , ** __ ):
361+ """Should create a span for fetch() with captured parameters."""
231362 async_call (self ._connection .fetch ("SELECT $1;" , "1" ))
232363 spans = self .memory_exporter .get_finished_spans ()
233364 self .assertEqual (len (spans ), 1 )
@@ -242,10 +373,11 @@ def test_instrumented_fetch_method_with_arguments(self, *_, **__):
242373 )
243374
244375 def test_instrumented_executemany_method_with_arguments (self , * _ , ** __ ):
376+ """Should create a span for executemany with captured parameters."""
245377 async_call (self ._connection .executemany ("SELECT $1;" , [["1" ], ["2" ]]))
246378 spans = self .memory_exporter .get_finished_spans ()
247379 self .assertEqual (len (spans ), 1 )
248-
380+ self . assertIs ( StatusCode . UNSET , spans [ 0 ]. status . status_code )
249381 self .check_span (spans [0 ])
250382 self .assertEqual (
251383 spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "SELECT $1;"
@@ -255,15 +387,41 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
255387 )
256388
257389 def test_instrumented_execute_interface_error_method (self , * _ , ** __ ):
390+ """Should create an error span for execute() with captured parameters."""
258391 with self .assertRaises (asyncpg .InterfaceError ):
259392 async_call (self ._connection .execute ("SELECT 42;" , 1 , 2 , 3 ))
260393 spans = self .memory_exporter .get_finished_spans ()
261394 self .assertEqual (len (spans ), 1 )
262-
395+ self . assertIs ( StatusCode . ERROR , spans [ 0 ]. status . status_code )
263396 self .check_span (spans [0 ])
264397 self .assertEqual (
265398 spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "SELECT 42;"
266399 )
267400 self .assertEqual (
268401 spans [0 ].attributes ["db.statement.parameters" ], "(1, 2, 3)"
269402 )
403+
404+ def test_instrumented_executemany_method_empty_query (self , * _ , ** __ ):
405+ """Should create a span for executemany() with captured parameters."""
406+ async_call (self ._connection .executemany ("" , []))
407+ spans = self .memory_exporter .get_finished_spans ()
408+ self .assertEqual (len (spans ), 1 )
409+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
410+ self .check_span (spans [0 ])
411+ self .assertEqual (spans [0 ].name , POSTGRES_DB_NAME )
412+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
413+ self .assertEqual (
414+ spans [0 ].attributes ["db.statement.parameters" ], "([],)"
415+ )
416+
417+ def test_instrumented_fetch_method_broken_asyncpg (self , * _ , ** __ ):
418+ """Should create a span for fetch() with "postgresql" as the span name."""
419+ with patch .object (
420+ self ._connection , "_params" , namedtuple ("ConnectionParams" , [])
421+ ):
422+ async_call (self ._connection .fetch ("" ))
423+ spans = self .memory_exporter .get_finished_spans ()
424+ self .assertEqual (len (spans ), 1 )
425+ self .assertIs (StatusCode .UNSET , spans [0 ].status .status_code )
426+ self .assertEqual (spans [0 ].name , "postgresql" )
427+ self .assertEqual (spans [0 ].attributes [SpanAttributes .DB_STATEMENT ], "" )
0 commit comments