Skip to content

Commit afc7f59

Browse files
committed
Speed up fanout exchange
Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random
1 parent fe818df commit afc7f59

File tree

3 files changed

+48
-8
lines changed

3 files changed

+48
-8
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
5858
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
5959
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
60+
-define(KHEPRI_SOURCE_ROUTE_PROJECTION, rabbit_khepri_source_route).
6061

6162
%% -------------------------------------------------------------------
6263
%% exists().
@@ -707,14 +708,19 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
707708
end.
708709

709710
match_routing_key_in_khepri(Src, ['_']) ->
710-
try
711-
MatchHead = #index_route{source_key = {Src, '_'},
712-
destination = '$1',
713-
_ = '_'},
714-
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
715-
catch
716-
error:badarg ->
717-
[]
711+
try ets:lookup_element(?KHEPRI_SOURCE_ROUTE_PROJECTION,
712+
Src, #source_route.destination, [])
713+
catch error:badarg ->
714+
%% The new projection doesn't exist.
715+
%% Fall back to old & slow approach.
716+
MatchHead = #index_route{source_key = {Src, '_'},
717+
destination = '$1',
718+
_ = '_'},
719+
try ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION,
720+
[{MatchHead, [], ['$1']}])
721+
catch error:badarg ->
722+
[]
723+
end
718724
end;
719725
match_routing_key_in_khepri(Src, RoutingKeys) ->
720726
lists:foldl(

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,6 +1319,7 @@ register_projections() ->
13191319
fun register_rabbit_user_permissions_projection/0,
13201320
fun register_rabbit_bindings_projection/0,
13211321
fun register_rabbit_index_route_projection/0,
1322+
fun register_rabbit_source_route_projection/0,
13221323
fun register_rabbit_topic_graph_projection/0],
13231324
rabbit_misc:for_each_while_ok(
13241325
fun(RegisterFun) ->
@@ -1442,6 +1443,38 @@ register_rabbit_index_route_projection() ->
14421443
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
14431444
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14441445

1446+
register_rabbit_source_route_projection() ->
1447+
MapFun = fun(_Path, #binding{source = Source,
1448+
key = Key,
1449+
destination = Destination,
1450+
args = Args}) ->
1451+
#source_route{source = Source,
1452+
key = Key,
1453+
destination = Destination,
1454+
args = Args}
1455+
end,
1456+
ProjectionFun = projection_fun_for_sets(MapFun),
1457+
Options = #{type => bag,
1458+
keypos => #source_route.source,
1459+
read_concurrency => true},
1460+
Projection = khepri_projection:new(
1461+
rabbit_khepri_source_route, ProjectionFun, Options),
1462+
Exchange = #if_data_matches{
1463+
pattern = #exchange{type = '$1', _ = '_'},
1464+
conditions = [{'andalso',
1465+
{'=/=', '$1', direct},
1466+
{'=/=', '$1', headers},
1467+
{'=/=', '$1', topic},
1468+
{'=/=', '$1', 'x-local-random'}
1469+
}]},
1470+
PathPattern = rabbit_db_binding:khepri_route_path(
1471+
_VHost = ?KHEPRI_WILDCARD_STAR,
1472+
Exchange,
1473+
_Kind = ?KHEPRI_WILDCARD_STAR,
1474+
_DstName = ?KHEPRI_WILDCARD_STAR,
1475+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1476+
khepri:register_projection(?STORE_ID, PathPattern, Projection).
1477+
14451478
%% Routing information is stored in the Khepri store as a `set'.
14461479
%% In order to turn these bindings into records in an ETS `bag', we use a
14471480
%% `khepri_projection:extended_projection_fun()' to determine the changes

deps/rabbit_common/include/rabbit.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
-record(route, {binding, value = const}).
9595
-record(reverse_route, {reverse_binding, value = const}).
9696
-record(index_route, {source_key, destination, args = []}).
97+
-record(source_route, {source, key, destination, args = []}).
9798

9899
-record(binding, {source, key, destination, args = []}).
99100
-record(reverse_binding, {destination, key, source, args = []}).

0 commit comments

Comments
 (0)