Skip to content

Commit 2f165f1

Browse files
mkuratczykmergify[bot]
authored andcommitted
rabbit_khepri: Fix topic binding deletion leak (#15025)
[Why] We use a Khepri projection to compute a graph for bindings that have a topic exchange as their source. This allows more efficient queries during routing. This graph is not stored in Khepri, only in the projection ETS table. When a binding is deleted, we need to clean up the graph. However, the pattern used to match the trie edges to delete was incorrect, leading to "orphaned" trie edges. The accumulation of these leftovers caused a memory leak. [How] The pattern was fixed to correctly match the appropriate trie edges. However, this fix alone is effective for new deployments of RabbitMQ only, when the projection function is registered for the first time. We also need to handle the update of already registered projections in existing clusters. To achieve that, first, we renamed the projection from `rabbit_khepri_topic_trie` to `rabbit_khepri_topic_trie_v2` to distinguish the bad one and the good one. Any updated RabbitMQ nodes in an existing cluster will use this new projection. Other existing out-of-date nodes will continue to use the old projection. Because both projections continue to exist, the cluster will still be affected by the memory leak. Then, each node will verify on startup if all other cluster members support the new projection. If that is the case, they will unregister the old projection. Therefore, once all nodes in a cluster are up-to-date and use the new projection, the old one will go away and the leaked memory will be reclaimed. This startup check could have been made simpler with a feature flag. We decided to go with a custom check in case a user would try to upgrade from a 4.1.x release that has the fix to a 4.2.x release that does not for instance. A feature flag would have prevented that upgrade path. Fixes #15024. (cherry picked from commit 76dcd92)
1 parent dcd25e2 commit 2f165f1

File tree

3 files changed

+249
-8
lines changed

3 files changed

+249
-8
lines changed

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
2727
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
2828
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
29-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie).
29+
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2).
3030

3131
-type match_result() :: [rabbit_types:binding_destination() |
3232
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@
175175
get_feature_state/0, get_feature_state/1,
176176
handle_fallback/1]).
177177

178+
%% Called remotely to handle unregistration of old projections.
179+
-export([supports_rabbit_khepri_topic_trie_v2/0]).
180+
178181
-ifdef(TEST).
179182
-export([register_projections/0,
180183
force_metadata_store/1,
@@ -1541,7 +1544,7 @@ projection_fun_for_sets(MapFun) ->
15411544
end.
15421545

15431546
register_rabbit_topic_graph_projection() ->
1544-
Name = rabbit_khepri_topic_trie,
1547+
Name = rabbit_khepri_topic_trie_v2,
15451548
%% This projection calls some external functions which are disallowed by
15461549
%% Horus because they interact with global or random state. We explicitly
15471550
%% allow them here for performance reasons.
@@ -1612,8 +1615,38 @@ register_rabbit_topic_graph_projection() ->
16121615
_Kind = ?KHEPRI_WILDCARD_STAR,
16131616
_DstName = ?KHEPRI_WILDCARD_STAR,
16141617
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1618+
_ = unregister_rabbit_topic_trie_v1_projection(),
16151619
khepri:register_projection(?STORE_ID, PathPattern, Projection).
16161620

1621+
supports_rabbit_khepri_topic_trie_v2() ->
1622+
true.
1623+
1624+
unregister_rabbit_topic_trie_v1_projection() ->
1625+
Nodes = rabbit_nodes:list_members(),
1626+
Rets = erpc:multicall(
1627+
Nodes,
1628+
?MODULE, supports_rabbit_khepri_topic_trie_v2, []),
1629+
SupportedEverywhere = lists:all(
1630+
fun(Ret) ->
1631+
Ret =:= {ok, true}
1632+
end, Rets),
1633+
case SupportedEverywhere of
1634+
true ->
1635+
?LOG_DEBUG(
1636+
"DB: unregister old `rabbit_khepri_topic_trie` Khepri "
1637+
"projection",
1638+
#{domain => ?RMQLOG_DOMAIN_DB}),
1639+
khepri:unregister_projections(
1640+
?STORE_ID, [rabbit_khepri_topic_trie]);
1641+
false ->
1642+
?LOG_DEBUG(
1643+
"DB: skipping unregistration of old "
1644+
"`rabbit_khepri_topic_trie` Khepri because some RabbitMQ "
1645+
"nodes still use it",
1646+
#{domain => ?RMQLOG_DOMAIN_DB}),
1647+
ok
1648+
end.
1649+
16171650
-spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when
16181651
Table :: ets:tid(),
16191652
Exchange :: rabbit_types:exchange_name(),
@@ -1660,7 +1693,9 @@ follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) ->
16601693
case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of
16611694
delete ->
16621695
OutEdgePattern = #topic_trie_edge{trie_edge =
1663-
TrieEdge#trie_edge{word = '_'},
1696+
TrieEdge#trie_edge{
1697+
node_id = ToNodeId,
1698+
word = '_'},
16641699
node_id = '_'},
16651700
case ets:match(Table, OutEdgePattern, 1) of
16661701
'$end_of_table' ->

deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl

Lines changed: 211 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,22 @@
1111
-include_lib("common_test/include/ct.hrl").
1212
-include_lib("eunit/include/eunit.hrl").
1313

14+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
15+
1416
-compile([nowarn_export_all, export_all]).
1517

1618
-define(VHOST, <<"/">>).
1719

1820
all() ->
1921
[
20-
{group, mnesia_store}
22+
{group, mnesia_store},
23+
{group, khepri_store}
2124
].
2225

2326
groups() ->
2427
[
2528
{mnesia_store, [], mnesia_tests()},
29+
{khepri_store, [], khepri_tests()},
2630
{benchmarks, [], benchmarks()}
2731
].
2832

@@ -40,6 +44,11 @@ mnesia_tests() ->
4044
build_multiple_key_from_deletion_events
4145
].
4246

47+
khepri_tests() ->
48+
[
49+
topic_trie_cleanup
50+
].
51+
4352
benchmarks() ->
4453
[
4554
match_benchmark
@@ -53,15 +62,26 @@ end_per_suite(Config) ->
5362
rabbit_ct_helpers:run_teardown_steps(Config).
5463

5564
init_per_group(mnesia_store = Group, Config0) ->
56-
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
65+
Config = rabbit_ct_helpers:set_config(
66+
Config0,
67+
[{metadata_store, mnesia},
68+
{rmq_nodes_count, 1}]),
69+
init_per_group_common(Group, Config);
70+
init_per_group(khepri_store = Group, Config0) ->
71+
Config = rabbit_ct_helpers:set_config(
72+
Config0,
73+
[{metadata_store, khepri},
74+
{rmq_nodes_count, 3}]),
5775
init_per_group_common(Group, Config);
58-
init_per_group(Group, Config) ->
76+
init_per_group(Group, Config0) ->
77+
Config = rabbit_ct_helpers:set_config(
78+
Config0,
79+
[{rmq_nodes_count, 1}]),
5980
init_per_group_common(Group, Config).
6081

6182
init_per_group_common(Group, Config) ->
6283
Config1 = rabbit_ct_helpers:set_config(Config, [
63-
{rmq_nodename_suffix, Group},
64-
{rmq_nodes_count, 1}
84+
{rmq_nodename_suffix, Group}
6585
]),
6686
rabbit_ct_helpers:run_steps(Config1,
6787
rabbit_ct_broker_helpers:setup_steps() ++
@@ -375,6 +395,192 @@ build_multiple_key_from_deletion_events1(Config) ->
375395
lists:sort([RK || {_, RK} <- rabbit_db_topic_exchange:trie_records_to_key(Records)])),
376396
passed.
377397

398+
%% ---------------------------------------------------------------------------
399+
%% Khepri-specific Tests
400+
%% ---------------------------------------------------------------------------
401+
402+
% https://github.com/rabbitmq/rabbitmq-server/issues/15024
403+
topic_trie_cleanup(Config) ->
404+
[_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
405+
406+
%% this test has to be isolated to avoid flakes
407+
VHost = <<"test-vhost-topic-trie">>,
408+
ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]),
409+
410+
%% Create an exchange in the vhost
411+
ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>),
412+
{ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare,
413+
[ExchangeName, topic, _Durable = true, _AutoDelete = false,
414+
_Internal = false, _Args = [], <<"test-user">>]),
415+
416+
%% List of routing keys that exercise topic exchange functionality
417+
RoutingKeys = [
418+
%% Exact patterns with common prefixes
419+
<<"a.b.c">>,
420+
<<"a.b.d">>,
421+
<<"a.b.e">>,
422+
<<"a.c.d">>,
423+
<<"a.c.e">>,
424+
<<"b.c.d">>,
425+
%% Patterns with a single wildcard
426+
<<"a.*.c">>,
427+
<<"a.*.d">>,
428+
<<"*.b.c">>,
429+
<<"*.b.d">>,
430+
<<"a.b.*">>,
431+
<<"a.c.*">>,
432+
<<"*.*">>,
433+
<<"a.*">>,
434+
<<"*.b">>,
435+
<<"*">>,
436+
%% Patterns with multiple wildcards
437+
<<"a.#">>,
438+
<<"a.b.#">>,
439+
<<"a.c.#">>,
440+
<<"#.c">>,
441+
<<"#.b.c">>,
442+
<<"#.b.d">>,
443+
<<"#">>,
444+
<<"#.#">>,
445+
%% Mixed patterns
446+
<<"a.*.#">>,
447+
<<"*.b.#">>,
448+
<<"*.#">>,
449+
<<"#.*">>,
450+
<<"#.*.#">>,
451+
%% More complex patterns with common prefixes
452+
<<"orders.created.#">>,
453+
<<"orders.updated.#">>,
454+
<<"orders.*.confirmed">>,
455+
<<"orders.#">>,
456+
<<"events.user.#">>,
457+
<<"events.system.#">>,
458+
<<"events.#">>
459+
],
460+
461+
%% Shuffle the routing keys to test in random order
462+
ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])],
463+
464+
%% Create bindings for all routing keys
465+
Bindings = [begin
466+
QueueName = rabbit_misc:r(VHost, queue,
467+
list_to_binary("queue-" ++ integer_to_list(Idx))),
468+
Ret = rabbit_ct_broker_helpers:rpc(
469+
Config, OldNode,
470+
rabbit_amqqueue, declare, [QueueName, true, false, [], self(), <<"test-user">>]),
471+
case Ret of
472+
{new, _Q} -> ok;
473+
{existing, _Q} -> ok
474+
end,
475+
#binding{source = ExchangeName,
476+
key = RoutingKey,
477+
destination = QueueName,
478+
args = []}
479+
end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)],
480+
481+
%% Add all bindings
482+
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>])
483+
|| B <- Bindings],
484+
485+
%% Log entries that were added to the ETS table
486+
lists:foreach(
487+
fun(Node) ->
488+
VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
489+
ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n",
490+
[Node, length(Bindings), length(VHostEntriesAfterAdd)])
491+
end, Nodes),
492+
493+
%% Shuffle bindings again for deletion in random order
494+
ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])],
495+
496+
%% Delete all bindings in random order
497+
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>])
498+
|| B <- ShuffledBindings],
499+
500+
%% Verify that the projection ETS table doesn't contain any entries related
501+
%% to this vhost
502+
try
503+
lists:foreach(
504+
fun(Node) ->
505+
%% We read and check the new projection table only. It is
506+
%% declared by the new node and is available everywhere. The
507+
%% old projection table might be there in case of
508+
%% mixed-version testing. This part will be tested in the
509+
%% second part of the testcase.
510+
VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
511+
ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]),
512+
513+
%% Assert that no entries were found for this vhost after deletion
514+
?assertEqual([], VHostEntriesAfterDelete)
515+
end, Nodes),
516+
517+
%% If we reach this point, we know the new projection works as expected
518+
%% and the leaked ETS entries are no more.
519+
%%
520+
%% Now, we want to test that after an upgrade, the old projection is
521+
%% unregistered.
522+
HasOldProjection = try
523+
VHostEntriesInOldTable = read_topic_trie_table(
524+
Config, OldNode, VHost, rabbit_khepri_topic_trie),
525+
ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]),
526+
?assertNotEqual([], VHostEntriesInOldTable),
527+
true
528+
catch
529+
error:{exception, badarg, _} ->
530+
%% The old projection doesn't exist. The old
531+
%% node, if we are in a mixed-version test,
532+
%% also supports the new projection. There
533+
%% is nothing more to test.
534+
ct:pal("The old projection was not registered, nothing to test"),
535+
false
536+
end,
537+
538+
case HasOldProjection of
539+
true ->
540+
%% The old projection is registered. Simulate an update by removing
541+
%% node 1 (which is the old one in our mixed-version testing) from
542+
%% the cluster, then restart node 2. On restart, it should
543+
%% unregister the old projection.
544+
%%
545+
%% FIXME: The cluster is configured at the test group level.
546+
%% Therefore, if we add more testcases to this group, following
547+
%% testcases won't have the expected cluster.
548+
?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)),
549+
?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)),
550+
551+
ct:pal("Restart new node (node 2)"),
552+
?assertEqual(ok, rabbit_ct_broker_helpers:restart_broker(Config, NewNode)),
553+
554+
ct:pal("Wait for projections to be restored"),
555+
?awaitMatch(
556+
Entries when is_list(Entries),
557+
catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2),
558+
60000),
559+
560+
ct:pal("Check that the old projection is gone"),
561+
?assertError(
562+
{exception, badarg, _},
563+
read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie));
564+
false ->
565+
ok
566+
end
567+
after
568+
%% Clean up the vhost
569+
ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>])
570+
end,
571+
572+
passed.
573+
574+
read_topic_trie_table(Config, Node, VHost, Table) ->
575+
Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]),
576+
[Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries,
577+
case TrieEdge of
578+
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
579+
V =:= VHost;
580+
_ ->
581+
false
582+
end].
583+
378584
%% ---------------------------------------------------------------------------
379585
%% Benchmarks
380586
%% ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)