Skip to content

Commit 21975a5

Browse files
committed
mirrored_supervisor: Restore child ID format
[Why] The format was changed to be compatible with Khepri paths. However, this ID is used in in-memory states here and there as well. So changing its format makes upgrades complicated because the code has to handle both the old and new formats possibly used by the mirrored supervisor already running on other nodes. [How] Instead, this patch converts the ID (in its old format) to something compatible with a Khepri path only when we need to build a Khepri path. This relies on the fact that the `Group` is a module and we can call it to let it convert the opaque ID to a Khepri path. While here, improve the type specs to document that a group is always a module name and to document what a child ID can be.
1 parent 9d0e2ae commit 21975a5

File tree

7 files changed

+41
-28
lines changed

7 files changed

+41
-28
lines changed

deps/rabbit/src/mirrored_supervisor.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@
137137
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
138138
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
139139

140-
-type group_name() :: any().
140+
-type group_name() :: module().
141+
-type child_id() :: term(). %% supervisor:child_id() is not exported.
142+
143+
-export_type([group_name/0,
144+
child_id/0]).
141145

142146
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
143147
GroupName :: group_name(),

deps/rabbit/src/rabbit_db_msup.erl

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ table_definitions() ->
7373
%% -------------------------------------------------------------------
7474

7575
-spec create_or_update(Group, Overall, Delegate, ChildSpec, Id) -> Ret when
76-
Group :: any(),
76+
Group :: mirrored_supervisor:group_name(),
7777
Overall :: pid(),
7878
Delegate :: pid() | undefined,
7979
ChildSpec :: supervisor2:child_spec(),
80-
Id :: {any(), any()},
80+
Id :: mirrored_supervisor:child_id(),
8181
Ret :: start | undefined | pid().
8282

8383
create_or_update(Group, Overall, Delegate, ChildSpec, Id) ->
@@ -129,8 +129,8 @@ write_in_mnesia(Group, Overall, ChildSpec, Id) ->
129129
ok = mnesia:write(?TABLE, S, write),
130130
ChildSpec.
131131

132-
create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} = Id) ->
133-
Path = khepri_mirrored_supervisor_path(Group, SimpleId),
132+
create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
133+
Path = khepri_mirrored_supervisor_path(Group, Id),
134134
S = #mirrored_sup_childspec{key = {Group, Id},
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
@@ -169,8 +169,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, {SimpleId, _} =
169169
%% -------------------------------------------------------------------
170170

171171
-spec delete(Group, Id) -> ok when
172-
Group :: any(),
173-
Id :: any().
172+
Group :: mirrored_supervisor:group_name(),
173+
Id :: mirrored_supervisor:child_id().
174174

175175
delete(Group, Id) ->
176176
rabbit_khepri:handle_fallback(
@@ -184,16 +184,16 @@ delete_in_mnesia(Group, Id) ->
184184
ok = mnesia:delete({?TABLE, {Group, Id}})
185185
end).
186186

187-
delete_in_khepri(Group, {SimpleId, _}) ->
188-
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, SimpleId)).
187+
delete_in_khepri(Group, Id) ->
188+
ok = rabbit_khepri:delete(khepri_mirrored_supervisor_path(Group, Id)).
189189

190190
%% -------------------------------------------------------------------
191191
%% find_mirror().
192192
%% -------------------------------------------------------------------
193193

194194
-spec find_mirror(Group, Id) -> Ret when
195-
Group :: any(),
196-
Id :: any(),
195+
Group :: mirrored_supervisor:group_name(),
196+
Id :: mirrored_supervisor:child_id(),
197197
Ret :: {ok, pid()} | {error, not_found}.
198198

199199
find_mirror(Group, Id) ->
@@ -214,8 +214,8 @@ find_mirror_in_mnesia(Group, Id) ->
214214
_ -> {error, not_found}
215215
end.
216216

217-
find_mirror_in_khepri(Group, {SimpleId, _}) ->
218-
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, SimpleId)) of
217+
find_mirror_in_khepri(Group, Id) ->
218+
case rabbit_khepri:get(khepri_mirrored_supervisor_path(Group, Id)) of
219219
{ok, #mirrored_sup_childspec{mirroring_pid = Pid}} ->
220220
{ok, Pid};
221221
_ ->
@@ -269,7 +269,7 @@ update_all_in_khepri(Overall, OldOverall) ->
269269
%% -------------------------------------------------------------------
270270

271271
-spec delete_all(Group) -> ok when
272-
Group :: any().
272+
Group :: mirrored_supervisor:group_name().
273273

274274
delete_all(Group) ->
275275
rabbit_khepri:handle_fallback(
@@ -324,5 +324,9 @@ clear_in_khepri() ->
324324
khepri_mirrored_supervisor_path() ->
325325
[?MODULE, mirrored_supervisor_childspec].
326326

327+
khepri_mirrored_supervisor_path(Group, Id)
328+
when is_atom(Id) orelse is_binary(Id) ->
329+
[?MODULE, mirrored_supervisor_childspec, Group, Id];
327330
khepri_mirrored_supervisor_path(Group, Id) ->
328-
[?MODULE, mirrored_supervisor_childspec, Group] ++ Id.
331+
IdPath = Group:id_to_khepri_path(Id),
332+
[?MODULE, mirrored_supervisor_childspec, Group] ++ IdPath.

deps/rabbit/src/rabbit_db_msup_m2k_converter.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ init_copy_to_khepri(_StoreId, _MigrationId, Tables) ->
4646
%% @private
4747

4848
copy_to_khepri(mirrored_sup_childspec = Table,
49-
#mirrored_sup_childspec{key = {Group, {SimpleId, _}} = Key} = Record,
49+
#mirrored_sup_childspec{key = {Group, Id} = Key} = Record,
5050
State) ->
5151
?LOG_DEBUG(
5252
"Mnesia->Khepri data copy: [~0p] key: ~0p",
5353
[Table, Key],
5454
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
55-
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, SimpleId),
55+
Path = rabbit_db_msup:khepri_mirrored_supervisor_path(Group, Id),
5656
rabbit_db_m2k_converter:with_correlation_id(
5757
fun(CorrId) ->
5858
Extra = #{async => CorrId},

deps/rabbit/test/mirrored_supervisor_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ childspec(Id) ->
331331
{id(Id), {?SERVER, start_link, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
332332

333333
id(Id) ->
334-
{[Id], Id}.
334+
Id.
335335

336336
pid_of(Id) ->
337337
{received, Pid, ping} = call(Id, ping),

deps/rabbit/test/rabbit_db_msup_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ create_or_update1(_Config) ->
8787
passed.
8888

8989
id(Id) ->
90-
{[Id], Id}.
90+
Id.
9191

9292
find_mirror(Config) ->
9393
passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, find_mirror1, [Config]).

deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
1919
-export([init/1]).
20+
-export([id_to_khepri_path/1]).
2021

2122
%%----------------------------------------------------------------------------
2223

@@ -49,12 +50,12 @@ start_child(X) ->
4950

5051
adjust({clear_upstream, VHost, UpstreamName}) ->
5152
_ = [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) ||
52-
{{_, #exchange{name = Name} = X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
53+
{#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
5354
Name#resource.virtual_host == VHost],
5455
ok;
5556
adjust(Reason) ->
5657
_ = [rabbit_federation_link_sup:adjust(Pid, X, Reason) ||
57-
{{_, X}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
58+
{X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
5859
ok.
5960

6061
stop_child(X) ->
@@ -77,7 +78,9 @@ init([]) ->
7778
%% See comment in rabbit_federation_queue_link_sup_sup:id/1
7879
id(X = #exchange{policy = Policy}) ->
7980
X1 = rabbit_exchange:immutable(X),
80-
{simple_id(X), X1#exchange{policy = Policy}}.
81+
X2 = X1#exchange{policy = Policy},
82+
X2.
8183

82-
simple_id(#exchange{name = #resource{virtual_host = VHost, name = Name}}) ->
84+
id_to_khepri_path(
85+
#exchange{name = #resource{virtual_host = VHost, name = Name}}) ->
8386
[exchange, VHost, Name].

deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
2020
-export([init/1]).
21+
-export([id_to_khepri_path/1]).
2122

2223
%%----------------------------------------------------------------------------
2324

@@ -51,12 +52,12 @@ start_child(Q) ->
5152

5253
adjust({clear_upstream, VHost, UpstreamName}) ->
5354
_ = [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) ||
54-
{{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
55+
{Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
5556
?amqqueue_vhost_equals(Q, VHost)],
5657
ok;
5758
adjust(Reason) ->
5859
_ = [rabbit_federation_link_sup:adjust(Pid, Q, Reason) ||
59-
{{_, Q}, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
60+
{Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
6061
ok.
6162

6263
stop_child(Q) ->
@@ -88,8 +89,9 @@ init([]) ->
8889
id(Q) when ?is_amqqueue(Q) ->
8990
Policy = amqqueue:get_policy(Q),
9091
Q1 = amqqueue:set_immutable(Q),
91-
{simple_id(Q), amqqueue:set_policy(Q1, Policy)}.
92+
Q2 = amqqueue:set_policy(Q1, Policy),
93+
Q2.
9294

93-
simple_id(Q) when ?is_amqqueue(Q) ->
94-
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
95+
id_to_khepri_path(Id) when ?is_amqqueue(Id) ->
96+
#resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Id),
9597
[queue, VHost, Name].

0 commit comments

Comments
 (0)