Skip to content

Commit 3783388

Browse files
authored
Merge pull request #9284 from rabbitmq/collection-of-changes-from-khepri-wip
Collection of changes from the Khepri integration project
2 parents 497ed9b + f11ff52 commit 3783388

25 files changed

+279
-521
lines changed

deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,31 @@ set_default_config() ->
136136
]}
137137
| OsirisConfig
138138
],
139-
apply_erlang_term_based_config(Config).
139+
%% Don't apply any defaults for values already set in the init flags.
140+
Config1 = filter_init_args(Config),
141+
apply_erlang_term_based_config(Config1).
142+
143+
filter_init_args(Config) ->
144+
lists:filtermap(
145+
fun({App, Vars}) ->
146+
case init:get_argument(App) of
147+
{ok, Args} ->
148+
Keys = [rabbit_data_coercion:to_atom(KeyName) ||
149+
[KeyName, _ValueExpr] <- Args],
150+
Vars1 = lists:filter(
151+
fun({Key, _Value}) ->
152+
not lists:member(Key, Keys)
153+
end, Vars),
154+
case Vars1 of
155+
[] ->
156+
false;
157+
_ ->
158+
{true, {App, Vars1}}
159+
end;
160+
error ->
161+
true
162+
end
163+
end, Config).
140164

141165
osiris_log(debug, Fmt, Args) ->
142166
?LOG_DEBUG(Fmt, Args,

deps/rabbit/src/amqqueue.erl

Lines changed: 58 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -88,33 +88,41 @@
8888
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
8989

9090
-record(amqqueue, {
91-
name :: rabbit_amqqueue:name() | '_', %% immutable
92-
durable :: boolean() | '_', %% immutable
93-
auto_delete :: boolean() | '_', %% immutable
94-
exclusive_owner = none :: pid() | none | '_', %% immutable
95-
arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
96-
pid :: pid() | ra_server_id() | none | '_', %% durable (just so we
97-
%% know home node)
98-
slave_pids = [] :: [pid()] | none | '_', %% transient
99-
sync_slave_pids = [] :: [pid()] | none| '_',%% transient
100-
recoverable_slaves = [] :: [atom()] | none | '_', %% durable
101-
policy :: proplists:proplist() |
102-
none | undefined | '_', %% durable, implicit update as
103-
%% above
104-
operator_policy :: proplists:proplist() |
105-
none | undefined | '_', %% durable, implicit
106-
%% update as above
107-
gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient
108-
decorators :: [atom()] | none | undefined | '_', %% transient,
109-
%% recalculated
110-
%% as above
111-
state = live :: atom() | none | '_', %% durable (have we crashed?)
112-
policy_version = 0 :: non_neg_integer() | '_',
113-
slave_pids_pending_shutdown = [] :: [pid()] | '_',
114-
vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
115-
options = #{} :: map() | '_',
116-
type = ?amqqueue_v1_type :: module() | '_',
117-
type_state = #{} :: map() | '_'
91+
%% immutable
92+
name :: rabbit_amqqueue:name() | ets:match_pattern(),
93+
%% immutable
94+
durable :: boolean() | ets:match_pattern(),
95+
%% immutable
96+
auto_delete :: boolean() | ets:match_pattern(),
97+
%% immutable
98+
exclusive_owner = none :: pid() | none | ets:match_pattern(),
99+
%% immutable
100+
arguments = [] :: rabbit_framing:amqp_table() | ets:match_pattern(),
101+
%% durable (just so we know home node)
102+
pid :: pid() | ra_server_id() | none | ets:match_pattern(),
103+
%% transient
104+
slave_pids = [] :: [pid()] | none | ets:match_pattern(),
105+
%% transient
106+
sync_slave_pids = [] :: [pid()] | none| ets:match_pattern(),
107+
%% durable
108+
recoverable_slaves = [] :: [atom()] | none | ets:match_pattern(),
109+
%% durable, implicit update as above
110+
policy :: proplists:proplist() | none | undefined | ets:match_pattern(),
111+
%% durable, implicit update as above
112+
operator_policy :: proplists:proplist() | none | undefined | ets:match_pattern(),
113+
%% transient
114+
gm_pids = [] :: [{pid(), pid()}] | none | ets:match_pattern(),
115+
%% transient, recalculated as above
116+
decorators :: [atom()] | none | undefined | ets:match_pattern(),
117+
%% durable (have we crashed?)
118+
state = live :: atom() | none | ets:match_pattern(),
119+
policy_version = 0 :: non_neg_integer() | ets:match_pattern(),
120+
slave_pids_pending_shutdown = [] :: [pid()] | ets:match_pattern(),
121+
%% secondary index
122+
vhost :: rabbit_types:vhost() | undefined | ets:match_pattern(),
123+
options = #{} :: map() | ets:match_pattern(),
124+
type = ?amqqueue_v1_type :: module() | ets:match_pattern(),
125+
type_state = #{} :: map() | ets:match_pattern()
118126
}).
119127

120128
-type amqqueue() :: amqqueue_v2().
@@ -146,26 +154,26 @@
146154

147155
-type amqqueue_pattern() :: amqqueue_v2_pattern().
148156
-type amqqueue_v2_pattern() :: #amqqueue{
149-
name :: rabbit_amqqueue:name() | '_',
150-
durable :: '_',
151-
auto_delete :: '_',
152-
exclusive_owner :: '_',
153-
arguments :: '_',
154-
pid :: '_',
155-
slave_pids :: '_',
156-
sync_slave_pids :: '_',
157-
recoverable_slaves :: '_',
158-
policy :: '_',
159-
operator_policy :: '_',
160-
gm_pids :: '_',
161-
decorators :: '_',
162-
state :: '_',
163-
policy_version :: '_',
164-
slave_pids_pending_shutdown :: '_',
165-
vhost :: '_',
166-
options :: '_',
167-
type :: atom() | '_',
168-
type_state :: '_'
157+
name :: rabbit_amqqueue:name() | ets:match_pattern(),
158+
durable :: ets:match_pattern(),
159+
auto_delete :: ets:match_pattern(),
160+
exclusive_owner :: ets:match_pattern(),
161+
arguments :: ets:match_pattern(),
162+
pid :: ets:match_pattern(),
163+
slave_pids :: ets:match_pattern(),
164+
sync_slave_pids :: ets:match_pattern(),
165+
recoverable_slaves :: ets:match_pattern(),
166+
policy :: ets:match_pattern(),
167+
operator_policy :: ets:match_pattern(),
168+
gm_pids :: ets:match_pattern(),
169+
decorators :: ets:match_pattern(),
170+
state :: ets:match_pattern(),
171+
policy_version :: ets:match_pattern(),
172+
slave_pids_pending_shutdown :: ets:match_pattern(),
173+
vhost :: ets:match_pattern(),
174+
options :: ets:match_pattern(),
175+
type :: atom() | ets:match_pattern(),
176+
type_state :: ets:match_pattern()
169177
}.
170178

171179
-export_type([amqqueue/0,
@@ -570,7 +578,9 @@ field_vhost() ->
570578
pattern_match_all() ->
571579
#amqqueue{_ = '_'}.
572580

573-
-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern().
581+
-spec pattern_match_on_name(Name) -> Pattern when
582+
Name :: rabbit_amqqueue:name() | ets:match_pattern(),
583+
Pattern :: amqqueue_pattern().
574584

575585
pattern_match_on_name(Name) ->
576586
#amqqueue{name = Name, _ = '_'}.

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,6 +1757,9 @@ forget_all_durable(Node) ->
17571757
forget_node_for_queue(_DeadNode, Q)
17581758
when ?amqqueue_is_quorum(Q) ->
17591759
ok;
1760+
forget_node_for_queue(_DeadNode, Q)
1761+
when ?amqqueue_is_stream(Q) ->
1762+
ok;
17601763
forget_node_for_queue(DeadNode, Q) ->
17611764
RS = amqqueue:get_recoverable_slaves(Q),
17621765
forget_node_for_queue(DeadNode, RS, Q).

deps/rabbit/src/rabbit_db.erl

Lines changed: 17 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
is_virgin_node/0, is_virgin_node/1,
2020
dir/0,
2121
ensure_dir_exists/0]).
22-
-export([run/1]).
2322

2423
%% Exported to be used by various rabbit_db_* modules
2524
-export([
@@ -49,8 +48,7 @@ init() ->
4948

5049
pre_init(IsVirgin),
5150

52-
Ret = run(
53-
#{mnesia => fun init_using_mnesia/0}),
51+
Ret = init_using_mnesia(),
5452
case Ret of
5553
ok ->
5654
?LOG_DEBUG(
@@ -91,9 +89,8 @@ init_using_mnesia() ->
9189
%% @doc Resets the database and the node.
9290

9391
reset() ->
94-
Ret = run(
95-
#{mnesia => fun reset_using_mnesia/0}),
96-
post_reset(Ret).
92+
ok = reset_using_mnesia(),
93+
post_reset().
9794

9895
reset_using_mnesia() ->
9996
?LOG_INFO(
@@ -106,9 +103,8 @@ reset_using_mnesia() ->
106103
%% @doc Resets the database and the node.
107104

108105
force_reset() ->
109-
Ret = run(
110-
#{mnesia => fun force_reset_using_mnesia/0}),
111-
post_reset(Ret).
106+
ok = force_reset_using_mnesia(),
107+
post_reset().
112108

113109
force_reset_using_mnesia() ->
114110
?LOG_DEBUG(
@@ -124,20 +120,17 @@ force_reset_using_mnesia() ->
124120
%% state, like if critical members are MIA.
125121

126122
force_load_on_next_boot() ->
127-
run(
128-
#{mnesia => fun force_load_on_next_boot_using_mnesia/0}).
123+
force_load_on_next_boot_using_mnesia().
129124

130125
force_load_on_next_boot_using_mnesia() ->
131126
?LOG_DEBUG(
132127
"DB: force load on next boot (using Mnesia)",
133128
#{domain => ?RMQLOG_DOMAIN_DB}),
134129
rabbit_mnesia:force_load_next_boot().
135130

136-
post_reset(ok) ->
131+
post_reset() ->
137132
rabbit_feature_flags:reset_registry(),
138-
ok;
139-
post_reset({error, _} = Error) ->
140-
Error.
133+
ok.
141134

142135
%% -------------------------------------------------------------------
143136
%% is_virgin_node().
@@ -152,8 +145,7 @@ post_reset({error, _} = Error) ->
152145
%% @see is_virgin_node/1.
153146

154147
is_virgin_node() ->
155-
run(
156-
#{mnesia => fun is_virgin_node_using_mnesia/0}).
148+
is_virgin_node_using_mnesia().
157149

158150
is_virgin_node_using_mnesia() ->
159151
rabbit_mnesia:is_virgin_node().
@@ -186,8 +178,7 @@ is_virgin_node(Node) when is_atom(Node) ->
186178
%% @returns the directory path.
187179

188180
dir() ->
189-
run(
190-
#{mnesia => fun mnesia_dir/0}).
181+
mnesia_dir().
191182

192183
mnesia_dir() ->
193184
rabbit_mnesia:dir().
@@ -207,26 +198,15 @@ ensure_dir_exists() ->
207198
end.
208199

209200
%% -------------------------------------------------------------------
210-
%% run().
201+
%% list_in_mnesia().
211202
%% -------------------------------------------------------------------
212203

213-
-spec run(Funs) -> Ret when
214-
Funs :: #{mnesia := Fun},
215-
Fun :: fun(() -> Ret),
216-
Ret :: any().
217-
%% @doc Runs the function corresponding to the used database engine.
218-
%%
219-
%% @returns the return value of `Fun'.
220-
221-
run(Funs)
222-
when is_map(Funs) andalso is_map_key(mnesia, Funs) ->
223-
#{mnesia := MnesiaFun} = Funs,
224-
run_using_mnesia(MnesiaFun).
225-
226-
run_using_mnesia(Fun) ->
227-
Fun().
204+
-spec list_in_mnesia(Table, Match) -> Objects when
205+
Table :: atom(),
206+
Match :: any(),
207+
Objects :: [any()].
228208

229209
list_in_mnesia(Table, Match) ->
230-
%% Not dirty_match_object since that would not be transactional when used in a
231-
%% tx context
210+
%% Not dirty_match_object since that would not be transactional when used
211+
%% in a tx context
232212
mnesia:async_dirty(fun () -> mnesia:match_object(Table, Match, read) end).

0 commit comments

Comments
 (0)