diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 35e6cf59d56c..76c149371ab1 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -26,7 +26,7 @@ -define(MNESIA_NODE_TABLE, rabbit_topic_trie_node). -define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge). -define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding). --define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie). +-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2). -type match_result() :: [rabbit_types:binding_destination() | {rabbit_amqqueue:name(), rabbit_types:binding_key()}]. diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 6b7f0a6a898e..d79ee6bcbecc 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -175,6 +175,9 @@ get_feature_state/0, get_feature_state/1, handle_fallback/1]). +%% Called remotely to handle unregistration of old projections. +-export([supports_rabbit_khepri_topic_trie_v2/0]). + -ifdef(TEST). -export([register_projections/0, force_metadata_store/1, @@ -1541,7 +1544,7 @@ projection_fun_for_sets(MapFun) -> end. register_rabbit_topic_graph_projection() -> - Name = rabbit_khepri_topic_trie, + Name = rabbit_khepri_topic_trie_v2, %% This projection calls some external functions which are disallowed by %% Horus because they interact with global or random state. We explicitly %% allow them here for performance reasons. @@ -1612,8 +1615,38 @@ register_rabbit_topic_graph_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), + _ = unregister_rabbit_topic_trie_v1_projection(), khepri:register_projection(?STORE_ID, PathPattern, Projection). +supports_rabbit_khepri_topic_trie_v2() -> + true. + +unregister_rabbit_topic_trie_v1_projection() -> + Nodes = rabbit_nodes:list_members(), + Rets = erpc:multicall( + Nodes, + ?MODULE, supports_rabbit_khepri_topic_trie_v2, []), + SupportedEverywhere = lists:all( + fun(Ret) -> + Ret =:= {ok, true} + end, Rets), + case SupportedEverywhere of + true -> + ?LOG_DEBUG( + "DB: unregister old `rabbit_khepri_topic_trie` Khepri " + "projection", + #{domain => ?RMQLOG_DOMAIN_DB}), + khepri:unregister_projections( + ?STORE_ID, [rabbit_khepri_topic_trie]); + false -> + ?LOG_DEBUG( + "DB: skipping unregistration of old " + "`rabbit_khepri_topic_trie` Khepri because some RabbitMQ " + "nodes still use it", + #{domain => ?RMQLOG_DOMAIN_DB}), + ok + end. + -spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when Table :: ets:tid(), Exchange :: rabbit_types:exchange_name(), @@ -1660,7 +1693,9 @@ follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) -> case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of delete -> OutEdgePattern = #topic_trie_edge{trie_edge = - TrieEdge#trie_edge{word = '_'}, + TrieEdge#trie_edge{ + node_id = ToNodeId, + word = '_'}, node_id = '_'}, case ets:match(Table, OutEdgePattern, 1) of '$end_of_table' -> diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index b59a1696fc75..f526924155ed 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -11,18 +11,22 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile([nowarn_export_all, export_all]). -define(VHOST, <<"/">>). all() -> [ - {group, mnesia_store} + {group, mnesia_store}, + {group, khepri_store} ]. groups() -> [ {mnesia_store, [], mnesia_tests()}, + {khepri_store, [], khepri_tests()}, {benchmarks, [], benchmarks()} ]. @@ -40,6 +44,11 @@ mnesia_tests() -> build_multiple_key_from_deletion_events ]. +khepri_tests() -> + [ + topic_trie_cleanup + ]. + benchmarks() -> [ match_benchmark @@ -53,15 +62,26 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_group(mnesia_store = Group, Config0) -> - Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]), + Config = rabbit_ct_helpers:set_config( + Config0, + [{metadata_store, mnesia}, + {rmq_nodes_count, 1}]), + init_per_group_common(Group, Config); +init_per_group(khepri_store = Group, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [{metadata_store, khepri}, + {rmq_nodes_count, 3}]), init_per_group_common(Group, Config); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [{rmq_nodes_count, 1}]), init_per_group_common(Group, Config). init_per_group_common(Group, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, 1} + {rmq_nodename_suffix, Group} ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ @@ -375,6 +395,192 @@ build_multiple_key_from_deletion_events1(Config) -> lists:sort([RK || {_, RK} <- rabbit_db_topic_exchange:trie_records_to_key(Records)])), passed. +%% --------------------------------------------------------------------------- +%% Khepri-specific Tests +%% --------------------------------------------------------------------------- + +% https://github.com/rabbitmq/rabbitmq-server/issues/15024 +topic_trie_cleanup(Config) -> + [_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% this test has to be isolated to avoid flakes + VHost = <<"test-vhost-topic-trie">>, + ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]), + + %% Create an exchange in the vhost + ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>), + {ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare, + [ExchangeName, topic, _Durable = true, _AutoDelete = false, + _Internal = false, _Args = [], <<"test-user">>]), + + %% List of routing keys that exercise topic exchange functionality + RoutingKeys = [ + %% Exact patterns with common prefixes + <<"a.b.c">>, + <<"a.b.d">>, + <<"a.b.e">>, + <<"a.c.d">>, + <<"a.c.e">>, + <<"b.c.d">>, + %% Patterns with a single wildcard + <<"a.*.c">>, + <<"a.*.d">>, + <<"*.b.c">>, + <<"*.b.d">>, + <<"a.b.*">>, + <<"a.c.*">>, + <<"*.*">>, + <<"a.*">>, + <<"*.b">>, + <<"*">>, + %% Patterns with multiple wildcards + <<"a.#">>, + <<"a.b.#">>, + <<"a.c.#">>, + <<"#.c">>, + <<"#.b.c">>, + <<"#.b.d">>, + <<"#">>, + <<"#.#">>, + %% Mixed patterns + <<"a.*.#">>, + <<"*.b.#">>, + <<"*.#">>, + <<"#.*">>, + <<"#.*.#">>, + %% More complex patterns with common prefixes + <<"orders.created.#">>, + <<"orders.updated.#">>, + <<"orders.*.confirmed">>, + <<"orders.#">>, + <<"events.user.#">>, + <<"events.system.#">>, + <<"events.#">> + ], + + %% Shuffle the routing keys to test in random order + ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])], + + %% Create bindings for all routing keys + Bindings = [begin + QueueName = rabbit_misc:r(VHost, queue, + list_to_binary("queue-" ++ integer_to_list(Idx))), + Ret = rabbit_ct_broker_helpers:rpc( + Config, OldNode, + rabbit_amqqueue, declare, [QueueName, true, false, [], self(), <<"test-user">>]), + case Ret of + {new, _Q} -> ok; + {existing, _Q} -> ok + end, + #binding{source = ExchangeName, + key = RoutingKey, + destination = QueueName, + args = []} + end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)], + + %% Add all bindings + [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>]) + || B <- Bindings], + + %% Log entries that were added to the ETS table + lists:foreach( + fun(Node) -> + VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2), + ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n", + [Node, length(Bindings), length(VHostEntriesAfterAdd)]) + end, Nodes), + + %% Shuffle bindings again for deletion in random order + ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])], + + %% Delete all bindings in random order + [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>]) + || B <- ShuffledBindings], + + %% Verify that the projection ETS table doesn't contain any entries related + %% to this vhost + try + lists:foreach( + fun(Node) -> + %% We read and check the new projection table only. It is + %% declared by the new node and is available everywhere. The + %% old projection table might be there in case of + %% mixed-version testing. This part will be tested in the + %% second part of the testcase. + VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2), + ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]), + + %% Assert that no entries were found for this vhost after deletion + ?assertEqual([], VHostEntriesAfterDelete) + end, Nodes), + + %% If we reach this point, we know the new projection works as expected + %% and the leaked ETS entries are no more. + %% + %% Now, we want to test that after an upgrade, the old projection is + %% unregistered. + HasOldProjection = try + VHostEntriesInOldTable = read_topic_trie_table( + Config, OldNode, VHost, rabbit_khepri_topic_trie), + ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]), + ?assertNotEqual([], VHostEntriesInOldTable), + true + catch + error:{exception, badarg, _} -> + %% The old projection doesn't exist. The old + %% node, if we are in a mixed-version test, + %% also supports the new projection. There + %% is nothing more to test. + ct:pal("The old projection was not registered, nothing to test"), + false + end, + + case HasOldProjection of + true -> + %% The old projection is registered. Simulate an update by removing + %% node 1 (which is the old one in our mixed-version testing) from + %% the cluster, then restart node 2. On restart, it should + %% unregister the old projection. + %% + %% FIXME: The cluster is configured at the test group level. + %% Therefore, if we add more testcases to this group, following + %% testcases won't have the expected cluster. + ?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)), + ?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)), + + ct:pal("Restart new node (node 2)"), + ?assertEqual(ok, rabbit_ct_broker_helpers:restart_broker(Config, NewNode)), + + ct:pal("Wait for projections to be restored"), + ?awaitMatch( + Entries when is_list(Entries), + catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2), + 60000), + + ct:pal("Check that the old projection is gone"), + ?assertError( + {exception, badarg, _}, + read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie)); + false -> + ok + end + after + %% Clean up the vhost + ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>]) + end, + + passed. + +read_topic_trie_table(Config, Node, VHost, Table) -> + Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]), + [Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries, + case TrieEdge of + #trie_edge{exchange_name = #resource{virtual_host = V}} -> + V =:= VHost; + _ -> + false + end]. + %% --------------------------------------------------------------------------- %% Benchmarks %% ---------------------------------------------------------------------------