|
13 | 13 | %% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/time_correction.html |
14 | 14 |
|
15 | 15 | -module(rabbit_delayed_message). |
16 | | - |
| 16 | +-include_lib("rabbit_common/include/rabbit.hrl"). |
17 | 17 | -rabbit_boot_step({?MODULE, |
18 | 18 | [{description, "exchange delayed message mnesia setup"}, |
19 | 19 | {mfa, {?MODULE, setup_mnesia, []}}, |
@@ -106,6 +106,7 @@ messages_delayed(Exchange) -> |
106 | 106 | %%-------------------------------------------------------------------- |
107 | 107 |
|
108 | 108 | init([]) -> |
| 109 | + recover(), |
109 | 110 | {ok, #state{timer = not_set}}. |
110 | 111 |
|
111 | 112 | handle_call({delay_message, Exchange, Delivery, Delay}, |
@@ -216,3 +217,45 @@ append_to_atom(Atom, Append) when is_atom(Append) -> |
216 | 217 | append_to_atom(Atom, atom_to_list(Append)); |
217 | 218 | append_to_atom(Atom, Append) when is_list(Append) -> |
218 | 219 | list_to_atom(atom_to_list(Atom) ++ Append). |
| 220 | + |
| 221 | +recover() -> |
| 222 | + %% topology recovery has already happened, we have to recover state for any durable |
| 223 | + %% consistent hash exchanges since plugin activation was moved later in boot process |
| 224 | + %% starting with RabbitMQ 3.8.4 |
| 225 | + case list_exchanges() of |
| 226 | + {ok, Xs} -> |
| 227 | + rabbit_log:debug("Delayed message exchange: " |
| 228 | + "have ~b durable exchanges to recover", |
| 229 | + [length(Xs)]), |
| 230 | + [recover_exchange_and_bindings(X) || X <- lists:usort(Xs)]; |
| 231 | + {aborted, Reason} -> |
| 232 | + rabbit_log:error( |
| 233 | + "Delayed message exchange: " |
| 234 | + "failed to recover durable bindings of one of the exchanges, reason: ~p", |
| 235 | + [Reason]) |
| 236 | + end. |
| 237 | + |
| 238 | +list_exchanges() -> |
| 239 | + case mnesia:transaction( |
| 240 | + fun () -> |
| 241 | + mnesia:match_object( |
| 242 | + rabbit_exchange, #exchange{durable = true, |
| 243 | + type = 'x-delayed-message', |
| 244 | + _ = '_'}, write) |
| 245 | + end) of |
| 246 | + {atomic, Xs} -> |
| 247 | + {ok, Xs}; |
| 248 | + {aborted, Reason} -> |
| 249 | + {aborted, Reason} |
| 250 | + end. |
| 251 | + |
| 252 | +recover_exchange_and_bindings(#exchange{name = XName} = X) -> |
| 253 | + mnesia:transaction( |
| 254 | + fun () -> |
| 255 | + Bindings = rabbit_binding:list_for_source(XName), |
| 256 | + [rabbit_exchange_type_delayed_message:add_binding(transaction, X, B) |
| 257 | + || B <- lists:usort(Bindings)], |
| 258 | + rabbit_log:debug("Delayed message exchange: " |
| 259 | + "recovered bindings for ~s", |
| 260 | + [rabbit_misc:rs(XName)]) |
| 261 | + end). |
0 commit comments