Skip to content

Conversation

@mergify
Copy link

@mergify mergify bot commented Sep 25, 2025

What?

This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

How?

In Khepri, use a new rabbit_khepri_queue_target projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

Benchmark

Fanout to 3 streams

Start broker:

make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"

high-credit.config contains:

[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].

Create the 3 streams and bindings to the fanout exchange:

deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

Start the client:

quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4

main branch:

Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s

with this PR:

Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.


This is an automatic backport of pull request #14570 done by Mergify.

* Reduce ETS copy overhead when delivering to target queues

 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

* Avoid creating 5 elems tuple

* Simplify rabbit_queue_type callbacks

deliver should only take targets and init should only take the full record

* Fix flaky test

* Fix specs

(cherry picked from commit 2e75bc6)
@mergify mergify bot assigned ansd Sep 25, 2025
@ansd ansd merged commit bc8d9aa into v4.2.x Sep 25, 2025
566 of 569 checks passed
@ansd ansd deleted the mergify/bp/v4.2.x/pr-14570 branch September 25, 2025 10:18
@ansd ansd added this to the 4.2.0 milestone Sep 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants