Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/instana/instrumentation/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ async def publish_with_instana(
"rabbitmq", span_context=parent_context
) as span:
connection = instance.channel._connection
message = kwargs["message"] if kwargs.get("message") else args[0]
routing_key = (
kwargs["routing_key"] if kwargs.get("routing_key") else args[1]
)

_extract_span_attributes(
span, connection, "publish", kwargs["routing_key"], instance.name
span, connection, "publish", routing_key, instance.name
)

message = args[0]
tracer.inject(
span.context,
Format.HTTP_HEADERS,
Expand Down
29 changes: 22 additions & 7 deletions tests/clients/test_aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _resource(self) -> Generator[None, None, None]:
# Ensure that allow_exit_as_root has the default value
agent.options.allow_exit_as_root = False

async def publish_message(self) -> None:
async def publish_message(self, params_combination: str = "both_args") -> None:
# Perform connection
connection = await connect()

Expand All @@ -46,11 +46,22 @@ async def publish_message(self) -> None:
exchange = await channel.declare_exchange("test.exchange")
await queue.bind(exchange, routing_key=queue_name)

message = Message(f"Hello {queue_name}".encode())

args = ()
kwargs = {}

if params_combination == "both_kwargs":
kwargs = {"message": message, "routing_key": queue_name}
elif params_combination == "arg_kwarg":
args = (message,)
kwargs = {"routing_key": queue_name}
else:
# params_combination == "both_args"
args = (message, queue_name)

# Sending the message
await exchange.publish(
Message(f"Hello {queue_name}".encode()),
routing_key=queue_name,
)
await exchange.publish(*args, **kwargs)

async def delete_queue(self) -> None:
connection = await connect()
Expand All @@ -75,9 +86,13 @@ async def consume_message(self, connect_method) -> None:
if queue.name in message.body.decode():
break

def test_basic_publish(self) -> None:
@pytest.mark.parametrize(
"params_combination",
["both_args", "both_kwargs", "arg_kwarg"],
)
def test_basic_publish(self, params_combination) -> None:
with tracer.start_as_current_span("test"):
self.loop.run_until_complete(self.publish_message())
self.loop.run_until_complete(self.publish_message(params_combination))

spans = self.recorder.queued_spans()
assert len(spans) == 2
Expand Down
Loading