Skip to content

Commit 73e5d93

Browse files
Merge pull request #15063 from rabbitmq/mergify/bp/v4.2.x/pr-15025
rabbit_khepri: Fix topic binding deletion leak (backport #15025)
2 parents dcd25e2 + 2f165f1 commit 73e5d93

File tree

3 files changed

+249
-8
lines changed

3 files changed

+249
-8
lines changed

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
2727
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
2828
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
29-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie).
29+
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2).
3030

3131
-type match_result() :: [rabbit_types:binding_destination() |
3232
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@
175175
get_feature_state/0, get_feature_state/1,
176176
handle_fallback/1]).
177177

178+
%% Called remotely to handle unregistration of old projections.
179+
-export([supports_rabbit_khepri_topic_trie_v2/0]).
180+
178181
-ifdef(TEST).
179182
-export([register_projections/0,
180183
force_metadata_store/1,
@@ -1541,7 +1544,7 @@ projection_fun_for_sets(MapFun) ->
15411544
end.
15421545

15431546
register_rabbit_topic_graph_projection() ->
1544-
Name = rabbit_khepri_topic_trie,
1547+
Name = rabbit_khepri_topic_trie_v2,
15451548
%% This projection calls some external functions which are disallowed by
15461549
%% Horus because they interact with global or random state. We explicitly
15471550
%% allow them here for performance reasons.
@@ -1612,8 +1615,38 @@ register_rabbit_topic_graph_projection() ->
16121615
_Kind = ?KHEPRI_WILDCARD_STAR,
16131616
_DstName = ?KHEPRI_WILDCARD_STAR,
16141617
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1618+
_ = unregister_rabbit_topic_trie_v1_projection(),
16151619
khepri:register_projection(?STORE_ID, PathPattern, Projection).
16161620

1621+
supports_rabbit_khepri_topic_trie_v2() ->
1622+
true.
1623+
1624+
unregister_rabbit_topic_trie_v1_projection() ->
1625+
Nodes = rabbit_nodes:list_members(),
1626+
Rets = erpc:multicall(
1627+
Nodes,
1628+
?MODULE, supports_rabbit_khepri_topic_trie_v2, []),
1629+
SupportedEverywhere = lists:all(
1630+
fun(Ret) ->
1631+
Ret =:= {ok, true}
1632+
end, Rets),
1633+
case SupportedEverywhere of
1634+
true ->
1635+
?LOG_DEBUG(
1636+
"DB: unregister old `rabbit_khepri_topic_trie` Khepri "
1637+
"projection",
1638+
#{domain => ?RMQLOG_DOMAIN_DB}),
1639+
khepri:unregister_projections(
1640+
?STORE_ID, [rabbit_khepri_topic_trie]);
1641+
false ->
1642+
?LOG_DEBUG(
1643+
"DB: skipping unregistration of old "
1644+
"`rabbit_khepri_topic_trie` Khepri because some RabbitMQ "
1645+
"nodes still use it",
1646+
#{domain => ?RMQLOG_DOMAIN_DB}),
1647+
ok
1648+
end.
1649+
16171650
-spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when
16181651
Table :: ets:tid(),
16191652
Exchange :: rabbit_types:exchange_name(),
@@ -1660,7 +1693,9 @@ follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) ->
16601693
case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of
16611694
delete ->
16621695
OutEdgePattern = #topic_trie_edge{trie_edge =
1663-
TrieEdge#trie_edge{word = '_'},
1696+
TrieEdge#trie_edge{
1697+
node_id = ToNodeId,
1698+
word = '_'},
16641699
node_id = '_'},
16651700
case ets:match(Table, OutEdgePattern, 1) of
16661701
'$end_of_table' ->

deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl

Lines changed: 211 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,22 @@
1111
-include_lib("common_test/include/ct.hrl").
1212
-include_lib("eunit/include/eunit.hrl").
1313

14+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
15+
1416
-compile([nowarn_export_all, export_all]).
1517

1618
-define(VHOST, <<"/">>).
1719

1820
all() ->
1921
[
20-
{group, mnesia_store}
22+
{group, mnesia_store},
23+
{group, khepri_store}
2124
].
2225

2326
groups() ->
2427
[
2528
{mnesia_store, [], mnesia_tests()},
29+
{khepri_store, [], khepri_tests()},
2630
{benchmarks, [], benchmarks()}
2731
].
2832

@@ -40,6 +44,11 @@ mnesia_tests() ->
4044
build_multiple_key_from_deletion_events
4145
].
4246

47+
khepri_tests() ->
48+
[
49+
topic_trie_cleanup
50+
].
51+
4352
benchmarks() ->
4453
[
4554
match_benchmark
@@ -53,15 +62,26 @@ end_per_suite(Config) ->
5362
rabbit_ct_helpers:run_teardown_steps(Config).
5463

5564
init_per_group(mnesia_store = Group, Config0) ->
56-
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
65+
Config = rabbit_ct_helpers:set_config(
66+
Config0,
67+
[{metadata_store, mnesia},
68+
{rmq_nodes_count, 1}]),
69+
init_per_group_common(Group, Config);
70+
init_per_group(khepri_store = Group, Config0) ->
71+
Config = rabbit_ct_helpers:set_config(
72+
Config0,
73+
[{metadata_store, khepri},
74+
{rmq_nodes_count, 3}]),
5775
init_per_group_common(Group, Config);
58-
init_per_group(Group, Config) ->
76+
init_per_group(Group, Config0) ->
77+
Config = rabbit_ct_helpers:set_config(
78+
Config0,
79+
[{rmq_nodes_count, 1}]),
5980
init_per_group_common(Group, Config).
6081

6182
init_per_group_common(Group, Config) ->
6283
Config1 = rabbit_ct_helpers:set_config(Config, [
63-
{rmq_nodename_suffix, Group},
64-
{rmq_nodes_count, 1}
84+
{rmq_nodename_suffix, Group}
6585
]),
6686
rabbit_ct_helpers:run_steps(Config1,
6787
rabbit_ct_broker_helpers:setup_steps() ++
@@ -375,6 +395,192 @@ build_multiple_key_from_deletion_events1(Config) ->
375395
lists:sort([RK || {_, RK} <- rabbit_db_topic_exchange:trie_records_to_key(Records)])),
376396
passed.
377397

398+
%% ---------------------------------------------------------------------------
399+
%% Khepri-specific Tests
400+
%% ---------------------------------------------------------------------------
401+
402+
% https://github.com/rabbitmq/rabbitmq-server/issues/15024
403+
topic_trie_cleanup(Config) ->
404+
[_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
405+
406+
%% this test has to be isolated to avoid flakes
407+
VHost = <<"test-vhost-topic-trie">>,
408+
ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]),
409+
410+
%% Create an exchange in the vhost
411+
ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>),
412+
{ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare,
413+
[ExchangeName, topic, _Durable = true, _AutoDelete = false,
414+
_Internal = false, _Args = [], <<"test-user">>]),
415+
416+
%% List of routing keys that exercise topic exchange functionality
417+
RoutingKeys = [
418+
%% Exact patterns with common prefixes
419+
<<"a.b.c">>,
420+
<<"a.b.d">>,
421+
<<"a.b.e">>,
422+
<<"a.c.d">>,
423+
<<"a.c.e">>,
424+
<<"b.c.d">>,
425+
%% Patterns with a single wildcard
426+
<<"a.*.c">>,
427+
<<"a.*.d">>,
428+
<<"*.b.c">>,
429+
<<"*.b.d">>,
430+
<<"a.b.*">>,
431+
<<"a.c.*">>,
432+
<<"*.*">>,
433+
<<"a.*">>,
434+
<<"*.b">>,
435+
<<"*">>,
436+
%% Patterns with multiple wildcards
437+
<<"a.#">>,
438+
<<"a.b.#">>,
439+
<<"a.c.#">>,
440+
<<"#.c">>,
441+
<<"#.b.c">>,
442+
<<"#.b.d">>,
443+
<<"#">>,
444+
<<"#.#">>,
445+
%% Mixed patterns
446+
<<"a.*.#">>,
447+
<<"*.b.#">>,
448+
<<"*.#">>,
449+
<<"#.*">>,
450+
<<"#.*.#">>,
451+
%% More complex patterns with common prefixes
452+
<<"orders.created.#">>,
453+
<<"orders.updated.#">>,
454+
<<"orders.*.confirmed">>,
455+
<<"orders.#">>,
456+
<<"events.user.#">>,
457+
<<"events.system.#">>,
458+
<<"events.#">>
459+
],
460+
461+
%% Shuffle the routing keys to test in random order
462+
ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])],
463+
464+
%% Create bindings for all routing keys
465+
Bindings = [begin
466+
QueueName = rabbit_misc:r(VHost, queue,
467+
list_to_binary("queue-" ++ integer_to_list(Idx))),
468+
Ret = rabbit_ct_broker_helpers:rpc(
469+
Config, OldNode,
470+
rabbit_amqqueue, declare, [QueueName, true, false, [], self(), <<"test-user">>]),
471+
case Ret of
472+
{new, _Q} -> ok;
473+
{existing, _Q} -> ok
474+
end,
475+
#binding{source = ExchangeName,
476+
key = RoutingKey,
477+
destination = QueueName,
478+
args = []}
479+
end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)],
480+
481+
%% Add all bindings
482+
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>])
483+
|| B <- Bindings],
484+
485+
%% Log entries that were added to the ETS table
486+
lists:foreach(
487+
fun(Node) ->
488+
VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
489+
ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n",
490+
[Node, length(Bindings), length(VHostEntriesAfterAdd)])
491+
end, Nodes),
492+
493+
%% Shuffle bindings again for deletion in random order
494+
ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])],
495+
496+
%% Delete all bindings in random order
497+
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>])
498+
|| B <- ShuffledBindings],
499+
500+
%% Verify that the projection ETS table doesn't contain any entries related
501+
%% to this vhost
502+
try
503+
lists:foreach(
504+
fun(Node) ->
505+
%% We read and check the new projection table only. It is
506+
%% declared by the new node and is available everywhere. The
507+
%% old projection table might be there in case of
508+
%% mixed-version testing. This part will be tested in the
509+
%% second part of the testcase.
510+
VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
511+
ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]),
512+
513+
%% Assert that no entries were found for this vhost after deletion
514+
?assertEqual([], VHostEntriesAfterDelete)
515+
end, Nodes),
516+
517+
%% If we reach this point, we know the new projection works as expected
518+
%% and the leaked ETS entries are no more.
519+
%%
520+
%% Now, we want to test that after an upgrade, the old projection is
521+
%% unregistered.
522+
HasOldProjection = try
523+
VHostEntriesInOldTable = read_topic_trie_table(
524+
Config, OldNode, VHost, rabbit_khepri_topic_trie),
525+
ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]),
526+
?assertNotEqual([], VHostEntriesInOldTable),
527+
true
528+
catch
529+
error:{exception, badarg, _} ->
530+
%% The old projection doesn't exist. The old
531+
%% node, if we are in a mixed-version test,
532+
%% also supports the new projection. There
533+
%% is nothing more to test.
534+
ct:pal("The old projection was not registered, nothing to test"),
535+
false
536+
end,
537+
538+
case HasOldProjection of
539+
true ->
540+
%% The old projection is registered. Simulate an update by removing
541+
%% node 1 (which is the old one in our mixed-version testing) from
542+
%% the cluster, then restart node 2. On restart, it should
543+
%% unregister the old projection.
544+
%%
545+
%% FIXME: The cluster is configured at the test group level.
546+
%% Therefore, if we add more testcases to this group, following
547+
%% testcases won't have the expected cluster.
548+
?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)),
549+
?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)),
550+
551+
ct:pal("Restart new node (node 2)"),
552+
?assertEqual(ok, rabbit_ct_broker_helpers:restart_broker(Config, NewNode)),
553+
554+
ct:pal("Wait for projections to be restored"),
555+
?awaitMatch(
556+
Entries when is_list(Entries),
557+
catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2),
558+
60000),
559+
560+
ct:pal("Check that the old projection is gone"),
561+
?assertError(
562+
{exception, badarg, _},
563+
read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie));
564+
false ->
565+
ok
566+
end
567+
after
568+
%% Clean up the vhost
569+
ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>])
570+
end,
571+
572+
passed.
573+
574+
read_topic_trie_table(Config, Node, VHost, Table) ->
575+
Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]),
576+
[Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries,
577+
case TrieEdge of
578+
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
579+
V =:= VHost;
580+
_ ->
581+
false
582+
end].
583+
378584
%% ---------------------------------------------------------------------------
379585
%% Benchmarks
380586
%% ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)