Skip to content

Commit 72e4e72

Browse files
committed
Switch from pg_local to pg
pg_local was based on the pg2 module, which got removed from Erlang/OTP years ago. It was replaced by the more efficient pg module, so let's use it directly. We use the node() as the local scope, so we maintain isolation between nodes and avoid local groups from being synchronized between nodes.
1 parent a1f9571 commit 72e4e72

File tree

12 files changed

+45
-378
lines changed

12 files changed

+45
-378
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on
277277
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_size_limit metadata_store_migration
278278
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
279279

280-
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
280+
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
281281
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
282282
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
283283
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue

deps/rabbit/src/pg_local.erl

Lines changed: 0 additions & 251 deletions
This file was deleted.

deps/rabbit/src/rabbit.erl

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
%% Boot steps.
4141
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
4242
pg_local_amqp_session/0,
43-
pg_local_amqp_connection/0, prevent_startup_if_node_was_reset/0]).
43+
pg_local_amqp_connection/0,
44+
start_pg_local/0,
45+
prevent_startup_if_node_was_reset/0]).
4446

4547
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
4648

@@ -157,6 +159,12 @@
157159
[{description, "kernel ready"},
158160
{requires, external_infrastructure}]}).
159161

162+
-rabbit_boot_step({pg_local,
163+
[{description, "local-only pg scope"},
164+
{mfa, {rabbit, start_pg_local, []}},
165+
{requires, kernel_ready},
166+
{enables, core_initialized}]}).
167+
160168
-rabbit_boot_step({guid_generator,
161169
[{description, "guid generator"},
162170
{mfa, {rabbit_sup, start_restartable_child,
@@ -1146,6 +1154,9 @@ boot_delegate() ->
11461154
recover() ->
11471155
ok = rabbit_vhost:recover().
11481156

1157+
start_pg_local() ->
1158+
rabbit_sup:start_child(pg_local_scope, pg, [node()]).
1159+
11491160
pg_local_amqp_session() ->
11501161
PgScope = pg_local_scope(amqp_session),
11511162
rabbit_sup:start_child(pg_amqp_session, pg, [PgScope]).

deps/rabbit/src/rabbit_channel.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ send_command(Pid, Msg) ->
304304
%% Delete this function when feature flag rabbitmq_4.2.0 becomes required.
305305
-spec deliver_reply_local(pid(), binary(), mc:state()) -> ok.
306306
deliver_reply_local(Pid, Key, Message) ->
307-
case pg_local:in_group(rabbit_channels, Pid) of
307+
case lists:member(Pid, pg:get_local_members(node(), rabbit_channels)) of
308308
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
309309
false -> ok
310310
end.
@@ -318,7 +318,7 @@ list() ->
318318
-spec list_local() -> [pid()].
319319

320320
list_local() ->
321-
pg_local:get_members(rabbit_channels).
321+
pg:get_local_members(node(), rabbit_channels).
322322

323323
-spec info_keys() -> rabbit_types:info_keys().
324324

@@ -444,7 +444,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
444444

445445
?LG_PROCESS_TYPE(channel),
446446
?store_proc_name({ConnName, Channel}),
447-
ok = pg_local:join(rabbit_channels, self()),
447+
ok = pg:join(node(), rabbit_channels, self()),
448448
Flow = case rabbit_misc:get_env(rabbit, classic_queue_flow_control, true) of
449449
true -> flow;
450450
false -> noflow
@@ -783,7 +783,7 @@ terminate(_Reason,
783783
queue_states = QueueCtxs}) ->
784784
rabbit_queue_type:close(QueueCtxs),
785785
{_Res, _State1} = notify_queues(State),
786-
pg_local:leave(rabbit_channels, self()),
786+
pg:leave(node(), rabbit_channels, self()),
787787
rabbit_event:if_enabled(State, #ch.stats_timer,
788788
fun() -> emit_stats(State) end),
789789
[delete_stats(Tag) || {Tag, _} <- get()],

deps/rabbit/src/rabbit_direct.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ force_event_refresh(Ref) ->
4141
-spec list_local() -> [pid()].
4242

4343
list_local() ->
44-
pg_local:get_members(rabbit_direct).
44+
pg:get_local_members(node(), rabbit_direct).
4545

4646
-spec list() -> [pid()].
4747

@@ -204,7 +204,7 @@ connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
204204
AuthzContext = proplists:get_value(variable_map, Infos, #{}),
205205
try rabbit_access_control:check_vhost_access(User, VHost,
206206
{ip, PeerHost}, AuthzContext) of
207-
ok -> ok = pg_local:join(rabbit_direct, Pid),
207+
ok -> ok = pg:join(node(), rabbit_direct, Pid),
208208
rabbit_core_metrics:connection_created(Pid, Infos),
209209
rabbit_event:notify(connection_created, Infos),
210210
_ = rabbit_alarm:register(
@@ -252,7 +252,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol,
252252
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
253253

254254
disconnect(Pid, Infos) ->
255-
pg_local:leave(rabbit_direct, Pid),
255+
pg:leave(node(), rabbit_direct, Pid),
256256
rabbit_core_metrics:connection_closed(Pid),
257257
rabbit_event:notify(connection_closed, Infos).
258258

0 commit comments

Comments
 (0)