From e4fb334d5e48afc8a74fa3185be02eeb0b99e81d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 20 Aug 2025 11:49:52 +0200 Subject: [PATCH 1/2] rabbit_definitions: Import topic permissions after exchanges [Why] Topic permissions depend on an exchange, in addition to a user and a vhost like other permissions. This fixes a bug where an exchange imported after a topic permission that depends on it caused the following crash when Khepri is used: {case_clause,{error,{khepri,mismatching_node, #{node_name => <<"exchange_name">>, node_props => #{payload_version => 1}, node_path => [rabbitmq,vhosts,<<"/">>,exchanges, <<"exchange_name">>], condition => {if_node_exists,false}, node_is_target => true}}}} The crash comes from the fact that the exchange code expect to either create the tree node in Khepri for that exchange, or there is an existing tree node holding an exchange tree node. Here, there was a tree node created implicitly when the topic permission was stored, but that tree node didn't have an exchange record (because the exchange was not imported yet). [How] We simply swap the import of topic permissions and exchanges. (cherry picked from commit 92c572e8879716dfdc3a756f6599dd889295c7bc) --- deps/rabbit/src/rabbit_definitions.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 257f76232e10..573e4f577c88 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -507,9 +507,9 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2), validate_limits(Map), concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), - concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), + concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), %% importing policies concurrently can be unsafe as queues will be getting From a15b4a23aa5f45bb6f9316c76b8676c45450970f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 20 Aug 2025 12:05:59 +0200 Subject: [PATCH 2/2] rabbit_db_exchange: Relax conditions when creating an exchange with Khepri MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [Why] When the module wanted to create an exchange in Khepri, it used `rabbit_khepri:create/2` which ensures that the tree node doesn't exist before. If the tree node exists, it expects it to contain an exchange record which is returned to the caller, indicating an exchange with that name already exists. However, there are several other resources stored under an exchange tree node, like bindings and topic permissions. In particular, during a definitions import, topic permissions used to be imported before exchanges. This caused a crash because the write of the topic permission automatically created a parent tree node for the exchange it dpends on, but without an exchange record obviously (see previous commit). [How] As an addition improvement to the previous commit, we change the conditions: instead of matching on the fact the tree node doesn't exist, the module also accepts that the tree node exists but has no payload. Under any of these conditions, the exchange is considered to be new and written to Khepri. (cherry picked from commit 5bcbfadcd6ebcb11484da1d60bd871b43e79d67c) --- deps/rabbit/src/rabbit_db_exchange.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 56077ca8a850..ae704e7af034 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -409,8 +409,12 @@ create_or_get_in_mnesia(#exchange{name = XName} = X) -> end). create_or_get_in_khepri(#exchange{name = XName} = X) -> - Path = khepri_exchange_path(XName), - case rabbit_khepri:create(Path, X) of + Path0 = khepri_exchange_path(XName), + Path1 = khepri_path:combine_with_conditions( + Path0, [#if_any{conditions = + [#if_node_exists{exists = false}, + #if_has_payload{has_payload = false}]}]), + case rabbit_khepri:put(Path1, X) of ok -> {new, X}; {error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->