Skip to content

Commit e796f87

Browse files
committed
Switch from pg_local to pg
pg_local is based on the pg2 module, which got removed from Erlang/OTP years ago. It was replaced by the more efficient pg module, so let's use it directly. We use node-local scopes and single-item groups so that (de)registration is fast, but we can list all connections/channels (by listing all groups in the scope).
1 parent 194615d commit e796f87

File tree

6 files changed

+103
-24
lines changed

6 files changed

+103
-24
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
base_product_version/0,
3232
motd_file/0,
3333
motd/0,
34-
pg_local_scope/1]).
34+
pg_local_scope/1,
35+
pg_scope_amqp091_channel/0,
36+
pg_scope_amqp091_connection/0,
37+
pg_scope_non_amqp_connection/0]).
3538
%% For CLI, testing and mgmt-agent.
3639
-export([set_log_level/1, log_locations/0, config_files/0]).
3740
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
@@ -40,7 +43,11 @@
4043
%% Boot steps.
4144
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
4245
pg_local_amqp_session/0,
43-
pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]).
46+
pg_local_amqp_connection/0,
47+
pg_local_amqp091_channel/0,
48+
pg_local_amqp091_connection/0,
49+
pg_local_non_amqp_connection/0,
50+
prevent_startup_if_node_was_reset/0]).
4451

4552
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
4653

@@ -292,11 +299,29 @@
292299
{enables, core_initialized}]}).
293300

294301
-rabbit_boot_step({pg_local_amqp_connection,
295-
[{description, "local-only pg scope for AMQP connections"},
302+
[{description, "local-only pg scope for AMQP 1.0 connections"},
296303
{mfa, {rabbit, pg_local_amqp_connection, []}},
297304
{requires, kernel_ready},
298305
{enables, core_initialized}]}).
299306

307+
-rabbit_boot_step({pg_local_amqp091_channel,
308+
[{description, "local-only pg scope for AMQP 0-9-1 channels"},
309+
{mfa, {rabbit, pg_local_amqp091_channel, []}},
310+
{requires, kernel_ready},
311+
{enables, core_initialized}]}).
312+
313+
-rabbit_boot_step({pg_local_amqp091_connection,
314+
[{description, "local-only pg scope for AMQP 0-9-1 connections"},
315+
{mfa, {rabbit, pg_local_amqp091_connection, []}},
316+
{requires, kernel_ready},
317+
{enables, core_initialized}]}).
318+
319+
-rabbit_boot_step({pg_local_non_amqp_connection,
320+
[{description, "local-only pg scope for non-AMQP connections"},
321+
{mfa, {rabbit, pg_local_non_amqp_connection, []}},
322+
{requires, kernel_ready},
323+
{enables, core_initialized}]}).
324+
300325
%%---------------------------------------------------------------------------
301326

302327
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -1154,9 +1179,33 @@ pg_local_amqp_connection() ->
11541179
PgScope = pg_local_scope(amqp_connection),
11551180
rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]).
11561181

1182+
pg_local_amqp091_channel() ->
1183+
PgScope = pg_local_scope(amqp091_channel),
1184+
persistent_term:put(pg_scope_amqp091_channel, PgScope),
1185+
rabbit_sup:start_child(pg_amqp091_channel, pg, [PgScope]).
1186+
1187+
pg_local_amqp091_connection() ->
1188+
PgScope = pg_local_scope(amqp091_connection),
1189+
persistent_term:put(pg_scope_amqp091_connection, PgScope),
1190+
rabbit_sup:start_child(pg_amqp091_connection, pg, [PgScope]).
1191+
1192+
pg_local_non_amqp_connection() ->
1193+
PgScope = pg_local_scope(non_amqp_connection),
1194+
persistent_term:put(pg_scope_non_amqp_connection, PgScope),
1195+
rabbit_sup:start_child(pg_non_amqp_connection, pg, [PgScope]).
1196+
11571197
pg_local_scope(Prefix) ->
11581198
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11591199

1200+
pg_scope_amqp091_channel() ->
1201+
persistent_term:get(pg_scope_amqp091_channel).
1202+
1203+
pg_scope_amqp091_connection() ->
1204+
persistent_term:get(pg_scope_amqp091_connection).
1205+
1206+
pg_scope_non_amqp_connection() ->
1207+
persistent_term:get(pg_scope_non_amqp_connection).
1208+
11601209
-spec update_cluster_tags() -> 'ok'.
11611210

11621211
update_cluster_tags() ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,9 @@ send_command(Pid, Msg) ->
304304
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
305305
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
306306
deliver_reply_local(Pid, Key, Message) ->
307-
case pg_local:in_group(rabbit_channels, Pid) of
308-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
309-
false -> ok
307+
case pg:get_local_members(pg_scope(), Pid) of
308+
[] -> ok;
309+
_ -> gen_server2:cast(Pid, {deliver_reply, Key, Message})
310310
end.
311311

312312
-spec list() -> [pid()].
@@ -318,7 +318,9 @@ list() ->
318318
-spec list_local() -> [pid()].
319319

320320
list_local() ->
321-
pg_local:get_members(rabbit_channels).
321+
try pg:which_groups(pg_scope())
322+
catch error:badarg -> []
323+
end.
322324

323325
-spec info_keys() -> rabbit_types:info_keys().
324326

@@ -436,6 +438,10 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
436438

437439
%%---------------------------------------------------------------------------
438440

441+
-spec pg_scope() -> atom().
442+
pg_scope() ->
443+
rabbit:pg_scope_amqp091_channel().
444+
439445
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
440446
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
441447
process_flag(trap_exit, true),
@@ -444,7 +450,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
444450

445451
?LG_PROCESS_TYPE(channel),
446452
?store_proc_name({ConnName, Channel}),
447-
ok = pg_local:join(rabbit_channels, self()),
453+
ok = pg:join(pg_scope(), self(), self()),
448454
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
449455
true -> flow;
450456
false -> noflow
@@ -783,7 +789,7 @@ terminate(_Reason,
783789
queue_states = QueueCtxs}) ->
784790
rabbit_queue_type:close(QueueCtxs),
785791
{_Res, _State1} = notify_queues(State),
786-
pg_local:leave(rabbit_channels, self()),
792+
pg:leave(pg_scope(), self(), self()),
787793
rabbit_event:if_enabled(State, #ch.stats_timer,
788794
fun() -> emit_stats(State) end),
789795
[delete_stats(Tag) || {Tag, _} <- get()],

deps/rabbit/src/rabbit_networking.erl

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,13 +461,18 @@ node_client_listeners(Node) ->
461461
end, Xs)
462462
end.
463463

464+
pg_scope_amqp091_connection() ->
465+
rabbit:pg_scope_amqp091_connection().
466+
464467
-spec register_connection(pid()) -> ok.
465468

466-
register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
469+
register_connection(Pid) ->
470+
pg:join(pg_scope_amqp091_connection(), Pid, Pid).
467471

468472
-spec unregister_connection(pid()) -> ok.
469473

470-
unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
474+
unregister_connection(Pid) ->
475+
pg:leave(pg_scope_amqp091_connection(), Pid, Pid).
471476

472477
-spec connections() -> [rabbit_types:connection()].
473478
connections() ->
@@ -476,17 +481,17 @@ connections() ->
476481

477482
-spec local_connections() -> [rabbit_types:connection()].
478483
local_connections() ->
479-
Amqp091Pids = pg_local:get_members(rabbit_connections),
484+
Amqp091Pids = pg:which_groups(pg_scope_amqp091_connection()),
480485
Amqp10Pids = rabbit_amqp1_0:list_local(),
481486
Amqp10Pids ++ Amqp091Pids.
482487

483488
-spec register_non_amqp_connection(pid()) -> ok.
484489

485-
register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid).
490+
register_non_amqp_connection(Pid) -> pg:join(rabbit:pg_scope_non_amqp_connection(), Pid, Pid).
486491

487492
-spec unregister_non_amqp_connection(pid()) -> ok.
488493

489-
unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid).
494+
unregister_non_amqp_connection(Pid) -> pg:leave(rabbit:pg_scope_non_amqp_connection(), Pid, Pid).
490495

491496
-spec non_amqp_connections() -> [rabbit_types:connection()].
492497

@@ -496,7 +501,7 @@ non_amqp_connections() ->
496501

497502
-spec local_non_amqp_connections() -> [rabbit_types:connection()].
498503
local_non_amqp_connections() ->
499-
pg_local:get_members(rabbit_non_amqp_connections).
504+
pg:which_local_groups(rabbit:pg_scope_non_amqp_connection()).
500505

501506
-spec connection_info(rabbit_types:connection(), rabbit_types:info_keys()) ->
502507
rabbit_types:infos().

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ local_call(Pid, Request) ->
235235

236236
is_local(Pid) ->
237237
rabbit_amqp_session:is_local(Pid) orelse
238-
pg_local:in_group(rabbit_channels, Pid).
238+
pg:get_local_members(rabbit:pg_scope_amqp091_channel(), Pid) =/= [].
239239

240240
handle_event(QName, {deliver, Msg}, #?STATE{name = QName,
241241
ctag = Ctag,

deps/rabbit/test/proxy_protocol_SUITE.erl

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,25 @@ proxy_protocol_v2_local(Config) ->
110110
ok.
111111

112112
connection_name() ->
113-
?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000),
114-
[Pid] = pg_local:get_members(rabbit_connections),
113+
Scope = rabbit:pg_scope_amqp091_connection(),
114+
GetGroups = fun() ->
115+
try pg:which_groups(Scope)
116+
catch error:badarg -> []
117+
end
118+
end,
119+
?awaitMatch([_], GetGroups(), 30000),
120+
[Pid] = GetGroups(),
115121
{dictionary, Dict} = process_info(Pid, dictionary),
116122
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
117123
ConnectionName.
118124

119125
wait_for_connection_close(Config) ->
120126
?awaitMatch(
121127
[],
122-
rabbit_ct_broker_helpers:rpc(
123-
Config, 0, pg_local, get_members, [rabbit_connnections]),
128+
begin
129+
Scope = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit, pg_scope_amqp091_connection, []),
130+
try rabbit_ct_broker_helpers:rpc(Config, 0, pg, which_groups, [Scope])
131+
catch error:badarg -> []
132+
end
133+
end,
124134
30000).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,8 +2214,17 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22142214
[NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
22152215
%% Check the channel state contains the state for the quorum queue on
22162216
%% channel 1 and 2
2217-
wait_for_cleanup(Server, NCh1, 0),
2218-
wait_for_cleanup(Server, NCh2, 1),
2217+
%% Note: pg:get_local_members doesn't guarantee order, so we need to identify
2218+
%% which channel has queue state
2219+
{ChWithoutState, ChWithState} = case length(rpc:call(Server,
2220+
rabbit_channel,
2221+
list_queue_states,
2222+
[NCh1])) of
2223+
0 -> {NCh1, NCh2};
2224+
1 -> {NCh2, NCh1}
2225+
end,
2226+
wait_for_cleanup(Server, ChWithoutState, 0),
2227+
wait_for_cleanup(Server, ChWithState, 1),
22192228
%% then delete the queue and wait for the process to terminate
22202229
?assertMatch(#'queue.delete_ok'{},
22212230
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
@@ -2225,8 +2234,8 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
22252234
[?SUPNAME]))
22262235
end, 30000),
22272236
%% Check that all queue states have been cleaned
2228-
wait_for_cleanup(Server, NCh2, 0),
2229-
wait_for_cleanup(Server, NCh1, 0).
2237+
wait_for_cleanup(Server, ChWithState, 0),
2238+
wait_for_cleanup(Server, ChWithoutState, 0).
22302239

22312240
cleanup_queue_state_on_channel_after_subscribe(Config) ->
22322241
%% Declare/delete the queue and publish in one channel, while consuming on a

0 commit comments

Comments
 (0)