-
Notifications
You must be signed in to change notification settings - Fork 4k
Reduce ETS copy overhead when delivering to target queues #14570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
You've got some serious ETS caremad going on 💪 🥳 |
It is something that's been nagging us for a long time actually and for streams in particular where we keep more data in the type_state field a stream queue record could have a size larger than 200 words! Copying this per message/queue is just unnecessary. If it wasn't for the extra bcc functionality we probably could have got away without the extra projection and only look up the pid for all messages after the first message routed to a given queue. In fact for QQs and streams we only need to check that the queue record exists as they maintain their leader state. |
2687034 to
fbd8a57
Compare
## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.
deliver should only take targets and init should only take the full record
* Reduce ETS copy overhead when delivering to target queues ## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%. * Avoid creating 5 elems tuple * Simplify rabbit_queue_type callbacks deliver should only take targets and init should only take the full record * Fix flaky test * Fix specs (cherry picked from commit 2e75bc6)
…14608) * Reduce ETS copy overhead when delivering to target queues ## What? This commit avoids copying the full amqqueue record from ETS per incoming message and target queue. The amqqueue record contains 21 elements and for some queue types, especially streams, some elements are themselves nested terms. ## How? In Khepri, use a new `rabbit_khepri_queue_target` projection which contains a subset of the full amqqueue record. This way all relevant information to deliver to a target queue can be looked up in a single ets:lookup_element call. Alternative approaches are described in erlang/otp#10211 ## Benchmark Fanout to 3 streams Start broker: ``` make run-broker TEST_TMPDIR="$HOME/scratch/rabbit/test" \ FULL=1 \ RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="$HOME/scratch/rabbit/high-credit.config" \ PLUGINS="rabbitmq_management" ``` `high-credit.config` contains: ``` [ {rabbit, [ %% Maximum incoming-window of AMQP 1.0 session. %% Default: 400 {max_incoming_window, 5000}, %% Maximum link-credit RabbitMQ grants to AMQP 1.0 sender. %% Default: 128 {max_link_credit, 2000}, %% Maximum link-credit RabbitMQ AMQP 1.0 session grants to sending queue. %% Default: 256 {max_queue_credit, 5000}, {loopback_users, []} ]}, {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` Create the 3 streams and bindings to the fanout exchange: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=stream durable=true name=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss2 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss3 ``` Start the client: ``` quiver-arrow send //host.docker.internal//exchanges/amq.fanout --summary --count 1m --body-size 4 ``` `main` branch: ``` Count ............................................. 1,000,000 messages Duration ............................................... 16.3 seconds Message rate ......................................... 61,237 messages/s ``` with this PR: ``` Count ............................................. 1,000,000 messages Duration ............................................... 14.2 seconds Message rate ......................................... 70,309 messages/s ``` Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%. * Avoid creating 5 elems tuple * Simplify rabbit_queue_type callbacks deliver should only take targets and init should only take the full record * Fix flaky test * Fix specs (cherry picked from commit 2e75bc6) Co-authored-by: David Ansari <[email protected]>
What?
This commit avoids copying the full amqqueue record from ETS per incoming message
and target queue.
The amqqueue record contains 21 elements and for some queue types,
especially streams, some elements are themselves nested terms.
How?
In Khepri, use a new
rabbit_khepri_queue_targetprojection whichcontains a subset of the full amqqueue record.
This way all relevant information to deliver to a target queue can be
looked up in a single ets:lookup_element call.
Alternative approaches are described in erlang/otp#10211
Benchmark
Fanout to 3 streams
Start broker:
high-credit.configcontains:Create the 3 streams and bindings to the fanout exchange:
Start the client:
mainbranch:with this PR:
Hence, this PR increases the throughput when sending to 3 streams via AMQP by ~14%.