From be450462fba4bd69adc75476bf866a1aaec795c1 Mon Sep 17 00:00:00 2001 From: Varsha GS Date: Thu, 10 Jul 2025 12:25:49 +0530 Subject: [PATCH 1/2] fix(aio-pika): Accept `message` & `routing_key` as either `positional` or `kw` args Signed-off-by: Varsha GS --- src/instana/instrumentation/aio_pika.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/instana/instrumentation/aio_pika.py b/src/instana/instrumentation/aio_pika.py index 52dfb054..5e3f58d0 100644 --- a/src/instana/instrumentation/aio_pika.py +++ b/src/instana/instrumentation/aio_pika.py @@ -51,11 +51,15 @@ async def publish_with_instana( "rabbitmq", span_context=parent_context ) as span: connection = instance.channel._connection + message = kwargs["message"] if kwargs.get("message") else args[0] + routing_key = ( + kwargs["routing_key"] if kwargs.get("routing_key") else args[1] + ) + _extract_span_attributes( - span, connection, "publish", kwargs["routing_key"], instance.name + span, connection, "publish", routing_key, instance.name ) - message = args[0] tracer.inject( span.context, Format.HTTP_HEADERS, From 53a3da9d6091e5fa1c80598ab5ad57fc6cd7785c Mon Sep 17 00:00:00 2001 From: Varsha GS Date: Mon, 14 Jul 2025 12:33:50 +0530 Subject: [PATCH 2/2] tests(aio-pika): Add tests to verify params combination Signed-off-by: Varsha GS --- tests/clients/test_aio_pika.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/tests/clients/test_aio_pika.py b/tests/clients/test_aio_pika.py index ee49b545..75c1afff 100644 --- a/tests/clients/test_aio_pika.py +++ b/tests/clients/test_aio_pika.py @@ -30,7 +30,7 @@ def _resource(self) -> Generator[None, None, None]: # Ensure that allow_exit_as_root has the default value agent.options.allow_exit_as_root = False - async def publish_message(self) -> None: + async def publish_message(self, params_combination: str = "both_args") -> None: # Perform connection connection = await connect() @@ -46,11 +46,22 @@ async def publish_message(self) -> None: exchange = await channel.declare_exchange("test.exchange") await queue.bind(exchange, routing_key=queue_name) + message = Message(f"Hello {queue_name}".encode()) + + args = () + kwargs = {} + + if params_combination == "both_kwargs": + kwargs = {"message": message, "routing_key": queue_name} + elif params_combination == "arg_kwarg": + args = (message,) + kwargs = {"routing_key": queue_name} + else: + # params_combination == "both_args" + args = (message, queue_name) + # Sending the message - await exchange.publish( - Message(f"Hello {queue_name}".encode()), - routing_key=queue_name, - ) + await exchange.publish(*args, **kwargs) async def delete_queue(self) -> None: connection = await connect() @@ -75,9 +86,13 @@ async def consume_message(self, connect_method) -> None: if queue.name in message.body.decode(): break - def test_basic_publish(self) -> None: + @pytest.mark.parametrize( + "params_combination", + ["both_args", "both_kwargs", "arg_kwarg"], + ) + def test_basic_publish(self, params_combination) -> None: with tracer.start_as_current_span("test"): - self.loop.run_until_complete(self.publish_message()) + self.loop.run_until_complete(self.publish_message(params_combination)) spans = self.recorder.queued_spans() assert len(spans) == 2