@@ -86,6 +86,22 @@ async def consume_message(self, connect_method) -> None:
8686 if queue .name in message .body .decode ():
8787 break
8888
89+ async def consume_with_exception (self , connect_method ) -> None :
90+ connection = await connect_method ()
91+
92+ async def on_message (msg ):
93+ raise RuntimeError ("Simulated Exception" )
94+
95+ async with connection :
96+ # Creating channel
97+ channel = await connection .channel ()
98+
99+ # Declaring queue
100+ queue = await channel .declare_queue (self .queue_name )
101+
102+ await queue .consume (on_message )
103+ await asyncio .sleep (1 ) # Wait to ensure the message is processed
104+
89105 @pytest .mark .parametrize (
90106 "params_combination" ,
91107 ["both_args" , "both_kwargs" , "arg_kwarg" ],
@@ -184,3 +200,45 @@ def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None:
184200
185201 assert_span_info (rabbitmq_publisher_span , "publish" )
186202 assert_span_info (rabbitmq_consumer_span , "consume" )
203+
204+ @pytest .mark .parametrize (
205+ "connect_method" ,
206+ [connect , connect_robust ],
207+ )
208+ def test_consume_with_exception (self , connect_method ) -> None :
209+ with tracer .start_as_current_span ("test" ):
210+ self .loop .run_until_complete (self .publish_message ())
211+ self .loop .run_until_complete (self .consume_with_exception (connect_method ))
212+
213+ spans = self .recorder .queued_spans ()
214+ assert len (spans ) == 3
215+
216+ rabbitmq_publisher_span = spans [0 ]
217+ rabbitmq_consumer_span = spans [1 ]
218+ test_span = spans [2 ]
219+
220+ # Same traceId
221+ assert test_span .t == rabbitmq_publisher_span .t
222+ assert rabbitmq_publisher_span .t == rabbitmq_consumer_span .t
223+
224+ # Parent relationships
225+ assert rabbitmq_publisher_span .p == test_span .s
226+ assert rabbitmq_consumer_span .p == rabbitmq_publisher_span .s
227+
228+ # Error logging
229+ assert not rabbitmq_publisher_span .ec
230+ assert rabbitmq_consumer_span .ec == 1
231+ assert not test_span .ec
232+
233+ # Span attributes
234+ def assert_span_info (rabbitmq_span : "ReadableSpan" , sort : str ) -> None :
235+ assert rabbitmq_span .data ["rabbitmq" ]["exchange" ] == "test.exchange"
236+ assert rabbitmq_span .data ["rabbitmq" ]["sort" ] == sort
237+ assert rabbitmq_span .data ["rabbitmq" ]["address" ]
238+ assert rabbitmq_span .data ["rabbitmq" ]["key" ] == "test.queue"
239+ assert rabbitmq_span .stack
240+ assert isinstance (rabbitmq_span .stack , list )
241+ assert len (rabbitmq_span .stack ) > 0
242+
243+ assert_span_info (rabbitmq_publisher_span , "publish" )
244+ assert_span_info (rabbitmq_consumer_span , "consume" )
0 commit comments