Skip to content

Conversation

@ansd
Copy link
Member

@ansd ansd commented Sep 18, 2025

What?

This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

How?

In Khepri, use a new rabbit_khepri_queue_target projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

Benchmark

Fanout to 3 streams

Start broker:

make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"

high-credit.config contains:

[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].

Create the 3 streams and bindings to the fanout exchange:

deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

Start the client:

quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4

main branch:

Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s

with this PR:

Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

@lukebakken
Copy link
Collaborator

You've got some serious ETS caremad going on 💪 🥳

@ansd ansd added this to the 4.2.0 milestone Sep 18, 2025
@kjnilsson
Copy link
Contributor

kjnilsson commented Sep 19, 2025

You've got some serious ETS caremad going on 💪 🥳

It is something that's been nagging us for a long time actually and for streams in particular where we keep more data in the type_state field a stream queue record could have a size larger than 200 words! Copying this per message/queue is just unnecessary.

If it wasn't for the extra bcc functionality we probably could have got away without the extra projection and only look up the pid for all messages after the first message routed to a given queue. In fact for QQs and streams we only need to check that the queue record exists as they maintain their leader state.

@michaelklishin michaelklishin modified the milestones: 4.2.0, 4.3.0 Sep 22, 2025
@ansd ansd force-pushed the queue-lookup branch 5 times, most recently from 2687034 to fbd8a57 Compare September 24, 2025 07:38
@ansd ansd marked this pull request as ready for review September 24, 2025 07:41
@ansd ansd requested a review from kjnilsson September 24, 2025 07:41
@ansd ansd marked this pull request as draft September 24, 2025 07:45
@ansd ansd self-assigned this Sep 24, 2025
 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.
deliver should only take targets and init should only take the full record
@ansd ansd marked this pull request as ready for review September 25, 2025 08:47
@ansd ansd requested a review from kjnilsson September 25, 2025 08:53
@ansd ansd merged commit 2e75bc6 into main Sep 25, 2025
287 checks passed
@ansd ansd deleted the queue-lookup branch September 25, 2025 09:25
mergify bot pushed a commit that referenced this pull request Sep 25, 2025
* Reduce ETS copy overhead when delivering to target queues

 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

* Avoid creating 5 elems tuple

* Simplify rabbit_queue_type callbacks

deliver should only take targets and init should only take the full record

* Fix flaky test

* Fix specs

(cherry picked from commit 2e75bc6)
ansd added a commit that referenced this pull request Sep 25, 2025
…14608)

* Reduce ETS copy overhead when delivering to target queues

 ## What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.

 ## How?

In Khepri, use a new `rabbit_khepri_queue_target` projection which
contains a subset of the full amqqueue record.

This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.

Alternative approaches are described in erlang/otp#10211

 ## Benchmark

Fanout to 3 streams

Start broker:
```
make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \
    FULL=1 \
    RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \
    RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \
    PLUGINS="rabbitmq_management"
```

`high-credit.config` contains:
```
[
 {rabbit, [
  %% Maximum incoming-window of AMQP 1.0 session.
  %% Default: 400
  {max_incoming_window, 5000},

  %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender.
  %% Default: 128
  {max_link_credit, 2000},

  %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue.
  %% Default: 256
  {max_queue_credit, 5000},

  {loopback_users, []}
 ]},

 {rabbitmq_management_agent, [
  {disable_metrics_collector, true}
 ]}
].
```

Create the 3 streams and bindings to the fanout exchange:
```
deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \
    deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3

```

Start the client:
```
quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4
```

`main` branch:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 16.3 seconds
Message rate ......................................... 61,237 messages/s
```

with this PR:
```
Count ............................................. 1,000,000 messages
Duration ............................................... 14.2 seconds
Message rate ......................................... 70,309 messages/s
```

Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.

* Avoid creating 5 elems tuple

* Simplify rabbit_queue_type callbacks

deliver should only take targets and init should only take the full record

* Fix flaky test

* Fix specs

(cherry picked from commit 2e75bc6)

Co-authored-by: David Ansari <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants