Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
base_product_version/0,
motd_file/0,
motd/0,
pg_local_scope/1]).
pg_local_scope/1,
pg_scope_amqp091_channel/0,
pg_scope_amqp091_connection/0,
pg_scope_non_amqp_connection/0]).
%% For CLI, testing and mgmt-agent.
-export([set_log_level/1, log_locations/0, config_files/0]).
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
Expand All @@ -40,7 +43,11 @@
%% Boot steps.
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
pg_local_amqp_session/0,
pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]).
pg_local_amqp_connection/0,
pg_local_amqp091_channel/0,
pg_local_amqp091_connection/0,
pg_local_non_amqp_connection/0,
prevent_startup_if_node_was_reset/0]).

-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).

Expand Down Expand Up @@ -292,11 +299,29 @@
{enables, core_initialized}]}).

-rabbit_boot_step({pg_local_amqp_connection,
[{description, "local-only pg scope for AMQP connections"},
[{description, "local-only pg scope for AMQP 1.0 connections"},
{mfa, {rabbit, pg_local_amqp_connection, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).

-rabbit_boot_step({pg_local_amqp091_channel,
[{description, "local-only pg scope for AMQP 0-9-1 channels"},
{mfa, {rabbit, pg_local_amqp091_channel, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).

-rabbit_boot_step({pg_local_amqp091_connection,
[{description, "local-only pg scope for AMQP 0-9-1 connections"},
{mfa, {rabbit, pg_local_amqp091_connection, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).

-rabbit_boot_step({pg_local_non_amqp_connection,
[{description, "local-only pg scope for non-AMQP connections"},
{mfa, {rabbit, pg_local_non_amqp_connection, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).

%%---------------------------------------------------------------------------

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -1154,9 +1179,33 @@ pg_local_amqp_connection() ->
PgScope = pg_local_scope(amqp_connection),
rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]).

pg_local_amqp091_channel() ->
PgScope = pg_local_scope(amqp091_channel),
persistent_term:put(pg_scope_amqp091_channel, PgScope),
rabbit_sup:start_child(pg_amqp091_channel, pg, [PgScope]).

pg_local_amqp091_connection() ->
PgScope = pg_local_scope(amqp091_connection),
persistent_term:put(pg_scope_amqp091_connection, PgScope),
rabbit_sup:start_child(pg_amqp091_connection, pg, [PgScope]).

pg_local_non_amqp_connection() ->
PgScope = pg_local_scope(non_amqp_connection),
persistent_term:put(pg_scope_non_amqp_connection, PgScope),
rabbit_sup:start_child(pg_non_amqp_connection, pg, [PgScope]).

pg_local_scope(Prefix) ->
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).

pg_scope_amqp091_channel() ->
persistent_term:get(pg_scope_amqp091_channel).

pg_scope_amqp091_connection() ->
persistent_term:get(pg_scope_amqp091_connection).

pg_scope_non_amqp_connection() ->
persistent_term:get(pg_scope_non_amqp_connection).

-spec update_cluster_tags() -> 'ok'.

update_cluster_tags() ->
Expand Down
18 changes: 12 additions & 6 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ send_command(Pid, Msg) ->
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
deliver_reply_local(Pid, Key, Message) ->
case pg_local:in_group(rabbit_channels, Pid) of
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
false -> ok
case pg:get_local_members(pg_scope(), Pid) of
[] -> ok;
_ -> gen_server2:cast(Pid, {deliver_reply, Key, Message})
end.

-spec list() -> [pid()].
Expand All @@ -318,7 +318,9 @@ list() ->
-spec list_local() -> [pid()].

list_local() ->
pg_local:get_members(rabbit_channels).
try pg:which_groups(pg_scope())
catch error:badarg -> []
end.

-spec info_keys() -> rabbit_types:info_keys().

Expand Down Expand Up @@ -436,6 +438,10 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->

%%---------------------------------------------------------------------------

-spec pg_scope() -> atom().
pg_scope() ->
rabbit:pg_scope_amqp091_channel().

init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
Expand All @@ -444,7 +450,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,

?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
ok = pg:join(pg_scope(), self(), self()),
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
true -> flow;
false -> noflow
Expand Down Expand Up @@ -783,7 +789,7 @@ terminate(_Reason,
queue_states = QueueCtxs}) ->
rabbit_queue_type:close(QueueCtxs),
{_Res, _State1} = notify_queues(State),
pg_local:leave(rabbit_channels, self()),
pg:leave(pg_scope(), self(), self()),
rabbit_event:if_enabled(State, #ch.stats_timer,
fun() -> emit_stats(State) end),
[delete_stats(Tag) || {Tag, _} <- get()],
Expand Down
17 changes: 11 additions & 6 deletions deps/rabbit/src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,18 @@ node_client_listeners(Node) ->
end, Xs)
end.

pg_scope_amqp091_connection() ->
rabbit:pg_scope_amqp091_connection().

-spec register_connection(pid()) -> ok.

register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
register_connection(Pid) ->
pg:join(pg_scope_amqp091_connection(), Pid, Pid).

-spec unregister_connection(pid()) -> ok.

unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
unregister_connection(Pid) ->
pg:leave(pg_scope_amqp091_connection(), Pid, Pid).

-spec connections() -> [rabbit_types:connection()].
connections() ->
Expand All @@ -476,17 +481,17 @@ connections() ->

-spec local_connections() -> [rabbit_types:connection()].
local_connections() ->
Amqp091Pids = pg_local:get_members(rabbit_connections),
Amqp091Pids = pg:which_groups(pg_scope_amqp091_connection()),
Amqp10Pids = rabbit_amqp1_0:list_local(),
Amqp10Pids ++ Amqp091Pids.

-spec register_non_amqp_connection(pid()) -> ok.

register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid).
register_non_amqp_connection(Pid) -> pg:join(rabbit:pg_scope_non_amqp_connection(), Pid, Pid).

-spec unregister_non_amqp_connection(pid()) -> ok.

unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid).
unregister_non_amqp_connection(Pid) -> pg:leave(rabbit:pg_scope_non_amqp_connection(), Pid, Pid).

-spec non_amqp_connections() -> [rabbit_types:connection()].

Expand All @@ -496,7 +501,7 @@ non_amqp_connections() ->

-spec local_non_amqp_connections() -> [rabbit_types:connection()].
local_non_amqp_connections() ->
pg_local:get_members(rabbit_non_amqp_connections).
pg:which_local_groups(rabbit:pg_scope_non_amqp_connection()).

-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_volatile_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ local_call(Pid, Request) ->

is_local(Pid) ->
rabbit_amqp_session:is_local(Pid) orelse
pg_local:in_group(rabbit_channels, Pid).
pg:get_local_members(rabbit:pg_scope_amqp091_channel(), Pid) =/= [].

handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
ctag = Ctag,
Expand Down
18 changes: 14 additions & 4 deletions deps/rabbit/test/proxy_protocol_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,25 @@ proxy_protocol_v2_local(Config) ->
ok.

connection_name() ->
?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000),
[Pid] = pg_local:get_members(rabbit_connections),
Scope = rabbit:pg_scope_amqp091_connection(),
GetGroups = fun() ->
try pg:which_groups(Scope)
catch error:badarg -> []
end
end,
?awaitMatch([_], GetGroups(), 30000),
[Pid] = GetGroups(),
{dictionary, Dict} = process_info(Pid, dictionary),
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
ConnectionName.

wait_for_connection_close(Config) ->
?awaitMatch(
[],
rabbit_ct_broker_helpers:rpc(
Config, 0, pg_local, get_members, [rabbit_connnections]),
begin
Scope = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit, pg_scope_amqp091_connection, []),
try rabbit_ct_broker_helpers:rpc(Config, 0, pg, which_groups, [Scope])
catch error:badarg -> []
end
end,
30000).
17 changes: 13 additions & 4 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2214,8 +2214,17 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
[NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
%% Check the channel state contains the state for the quorum queue on
%% channel 1 and 2
wait_for_cleanup(Server, NCh1, 0),
wait_for_cleanup(Server, NCh2, 1),
%% Note: pg:get_local_members doesn't guarantee order, so we need to identify
%% which channel has queue state
{ChWithoutState, ChWithState} = case length(rpc:call(Server,
rabbit_channel,
list_queue_states,
[NCh1])) of
0 -> {NCh1, NCh2};
1 -> {NCh2, NCh1}
end,
wait_for_cleanup(Server, ChWithoutState, 0),
wait_for_cleanup(Server, ChWithState, 1),
%% then delete the queue and wait for the process to terminate
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
Expand All @@ -2225,8 +2234,8 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
[?SUPNAME]))
end, 30000),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh2, 0),
wait_for_cleanup(Server, NCh1, 0).
wait_for_cleanup(Server, ChWithState, 0),
wait_for_cleanup(Server, ChWithoutState, 0).

cleanup_queue_state_on_channel_after_subscribe(Config) ->
%% Declare/delete the queue and publish in one channel, while consuming on a
Expand Down
9 changes: 8 additions & 1 deletion deps/rabbit_common/src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,14 @@ otp_release() ->
end.

platform_and_version() ->
string:join(["Erlang/OTP", otp_release()], " ").
case persistent_term:get(platform_and_version, undefined) of
undefined ->
PV = string:join(["Erlang/OTP", otp_release()], " "),
persistent_term:put(platform_and_version, PV),
PV;
PV ->
PV
end.

otp_system_version() ->
string:strip(erlang:system_info(system_version), both, $\n).
Expand Down
15 changes: 11 additions & 4 deletions deps/rabbit_common/src/rabbit_net.erl
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,19 @@ tcp_host(IPAddress) ->
end.

hostname() ->
{ok, Hostname} = inet:gethostname(),
case inet:gethostbyname(Hostname) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
case persistent_term:get(platform_and_version, undefined) of
undefined ->
{ok, Hostname} = inet:gethostname(),
H = case inet:gethostbyname(Hostname) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
end,
persistent_term:put(platform_and_version, H);
Hostname ->
Hostname
end.


format_nic_attribute({Key, undefined}) ->
{Key, undefined};
format_nic_attribute({Key = flags, List}) when is_list(List) ->
Expand Down
17 changes: 1 addition & 16 deletions deps/rabbitmq_stream/src/rabbit_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
host/0,
tls_host/0,
port/0,
tls_port/0,
kill_connection/1]).
tls_port/0]).
-export([stop/1]).
-export([emit_connection_info_local/3,
emit_connection_info_all/4,
Expand Down Expand Up @@ -132,20 +131,6 @@ tls_port_from_listener() ->
stop(_State) ->
ok.

kill_connection(ConnectionName) ->
ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName),
lists:foreach(fun(ConnectionPid) ->
ConnectionPid ! {infos, self()},
receive
{ConnectionPid,
#{<<"connection_name">> := ConnectionNameBin}} ->
exit(ConnectionPid, kill);
{ConnectionPid, _ClientProperties} -> ok
after 1000 -> ok
end
end,
pg_local:get_members(rabbit_stream_connections)).

emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids =
[spawn_link(Node,
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,6 @@ transition_to_opened(Transport,
Configuration,
NewConnection,
NewConnectionState) ->
% TODO remove registration to rabbit_stream_connections
% just meant to be able to close the connection remotely
% should be possible once the connections are available in ctl list_connections
pg_local:join(rabbit_stream_connections, self()),
Connection1 =
rabbit_event:init_stats_timer(NewConnection,
#stream_connection.stats_timer),
Expand Down
Loading
Loading