From 7d12e42f66f7cd50bdb6db7aa058254d2a900784 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 23 Oct 2025 18:18:43 +0200 Subject: [PATCH 1/2] rabbit_logger_exchange_h: Do not re-enter itself [Why] Publishing a log message to an exchange might trigger other messages to be logged. This caused two issues: 1. the exchange logger re-entering itself in an infinite loop 2. if the message is logged from a gen_server-like process, like a Ra server, that is involved in the publishing code path, the process might call itself, leading to a blocked situation [How] The first issue is fixed with a variable stored in the process dictionary by the `log/2` function. This way, the function can check if it is called from itself because the first incantation stored a variable there. The second issue is fixed by publishing the message asynchronously from a separate process. This is ok because we don't care if the publish was successful or not. We re-use the process that was started initially to declare the exchange. Fixes #14069. (cherry picked from commit 7cc0aa982cfdf5acc6943ed13b45db9d7c68c4a6) --- deps/rabbit/src/rabbit_logger_exchange_h.erl | 40 ++++++++++++++++---- deps/rabbit/test/logging_SUITE.erl | 34 +++++++++++++---- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/src/rabbit_logger_exchange_h.erl b/deps/rabbit/src/rabbit_logger_exchange_h.erl index 067b1c6d3ff0..d9cca5ef4c5f 100644 --- a/deps/rabbit/src/rabbit_logger_exchange_h.erl +++ b/deps/rabbit/src/rabbit_logger_exchange_h.erl @@ -43,8 +43,16 @@ filter_config(Config) -> log(#{meta := #{mfa := {?MODULE, _, _}}}, _) -> ok; log(LogEvent, Config) -> + %% Publishing the log message to an exchange might trigger more logging, + %% triggering an infinite logging loop. To prevent that, we make use the + %% process dictionary to record the fact that this logger was already + %% entered. If that's the case when this function is called, we just drop + %% the log event. + Key = ?MODULE, + ReEntered = erlang:get(Key) =/= undefined, case rabbit_boot_state:get() of - ready -> + ready when not ReEntered -> + erlang:put(Key, ?FUNCTION_NAME), try do_log(LogEvent, Config) catch @@ -53,22 +61,30 @@ log(LogEvent, Config) -> %% removes the logger_exchange handler, which in %% turn deletes the log exchange and its bindings erlang:display({?MODULE, crashed, {C, R, S}}) + after + erlang:erase(Key) end, ok; _ -> ok end. -do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) -> +do_log( + LogEvent, + #{config := #{exchange := Exchange, + setup_proc := Pid}} = Config) -> RoutingKey = make_routing_key(LogEvent, Config), PBasic = log_event_to_amqp_msg(LogEvent, Config), Body = try_format_body(LogEvent, Config), Content = rabbit_basic:build_content(PBasic, Body), case mc_amqpl:message(Exchange, RoutingKey, Content) of {ok, Msg} -> - case rabbit_queue_type:publish_at_most_once(Exchange, Msg) of - ok -> ok; - {error, not_found} -> ok - end; + %% Publishing a message might involve a Erlang process, like a Ra + %% server process, to log something and call itself. We need to + %% publish the message asynchronously from a separate process and + %% ignore the fate of that publish, to not block an Erlang + %% process. + Pid ! {publish, Msg}, + ok; {error, _Reason} -> %% it would be good to log this error but can we? ok @@ -169,7 +185,8 @@ setup_proc( ok -> ?LOG_INFO( "Logging to ~ts ready", [rabbit_misc:rs(Exchange)], - #{domain => ?RMQLOG_DOMAIN_GLOBAL}); + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + loop(Config); error -> ?LOG_DEBUG( "Logging to ~ts not ready, trying again in ~b second(s)", @@ -182,6 +199,15 @@ setup_proc( end end. +loop(#{config := #{exchange := Exchange}} = Config) -> + receive + {publish, Msg} -> + _ = rabbit_queue_type:publish_at_most_once(Exchange, Msg), + loop(Config); + stop -> + ok + end. + declare_exchange(#{config := #{exchange := Exchange}}) -> try rabbit_exchange:declare( Exchange, topic, true, false, true, [], ?INTERNAL_USER) of diff --git a/deps/rabbit/test/logging_SUITE.erl b/deps/rabbit/test/logging_SUITE.erl index abd374ec01ee..c5aace9a7d48 100644 --- a/deps/rabbit/test/logging_SUITE.erl +++ b/deps/rabbit/test/logging_SUITE.erl @@ -53,6 +53,7 @@ logging_to_exchange_works/1, update_log_exchange_config/1, + use_exchange_logger_when_enabling_khepri_db/1, logging_to_syslog_works/1]). @@ -99,7 +100,8 @@ groups() -> {exchange_output, [], [logging_to_exchange_works, - update_log_exchange_config]}, + update_log_exchange_config, + use_exchange_logger_when_enabling_khepri_db]}, {syslog_output, [], [logging_to_syslog_works]} @@ -150,17 +152,27 @@ init_per_testcase(Testcase, Config) -> %% group will run in the context of that RabbitMQ node. exchange_output -> ExchProps = [{enabled, true}, - {level, debug}] , + {level, debug}], Config1 = rabbit_ct_helpers:set_config( Config, - [{rmq_nodes_count, 1}, - {rmq_nodename_suffix, Testcase}]), - Config2 = rabbit_ct_helpers:merge_app_env( - Config1, + [{rmq_nodename_suffix, Testcase}]), + Config2 = case Testcase of + use_exchange_logger_when_enabling_khepri_db -> + rabbit_ct_helpers:set_config( + Config1, + [{rmq_nodes_count, 3}, + {metadata_store, mnesia}]); + _ -> + rabbit_ct_helpers:set_config( + Config1, + [{rmq_nodes_count, 1}]) + end, + Config3 = rabbit_ct_helpers:merge_app_env( + Config2, {rabbit, [{log, [{exchange, ExchProps}, {file, [{level, debug}]}]}]}), rabbit_ct_helpers:run_steps( - Config2, + Config3, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); @@ -1102,6 +1114,14 @@ update_log_exchange_config(Config) -> ?assertEqual(HandlerConfig1, HandlerConfig2), ok. +use_exchange_logger_when_enabling_khepri_db(Config) -> + ?assertNot(rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, is_enabled, [khepri_db])), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db)). + logging_to_syslog_works(Config) -> Context = default_context(Config), ok = application:set_env( From ad5f47d6e330f5e614f4c722db0b0c50f619daa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 23 Oct 2025 19:27:33 +0200 Subject: [PATCH 2/2] rabbit_logger_exchange_h: Register setup process [Why] It makes debugging easier, especially now that this process does more than the initial setup: it acts as the actual publisher of the log messages. (cherry picked from commit 73a41cc990344977c0f27b4581ab8695a5228dee) --- deps/rabbit/src/rabbit_logger_exchange_h.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_logger_exchange_h.erl b/deps/rabbit/src/rabbit_logger_exchange_h.erl index d9cca5ef4c5f..6f6fbed40447 100644 --- a/deps/rabbit/src/rabbit_logger_exchange_h.erl +++ b/deps/rabbit/src/rabbit_logger_exchange_h.erl @@ -180,7 +180,13 @@ wait_for_initial_pass(N) -> end. setup_proc( - #{config := #{exchange := Exchange}} = Config) -> + #{id := Id, + config := #{exchange := Exchange}} = Config) -> + %% We register this process using the logger handler ID. It makes + %% debugging convenient but it's not critical. That's why we catch any + %% exceptions and ignore the return value. + _ = catch erlang:register(Id, self()), + case declare_exchange(Config) of ok -> ?LOG_INFO(