Skip to content

Commit aa778aa

Browse files
the-mikedavismergify[bot]
authored andcommitted
Handle rabbit_amqqueue:internal_delete/3 failures in classic queues
The design of `rabbit_amqqueue_process` makes this change challenging. The old implementation of the handler of the `{delete,_,_,_}` command simply stopped the process and any cleanup was done in `gen_server2`'s `terminate` callback. This makes it impossible to pass any error back to the caller if the record can't be deleted from the metadata store before a timeout. The strategy taken here slightly mirrors an existing `{shutdown, missing_owner}` termination value which can be returned from `init_it2/3`. We pass the `ReplyTo` for the call with the state. We then optionally reply to this `ReplyTo` if it is set in `terminate_delete/4` with the result of `rabbit_amqqueue:internal_delete/3`. So deletion of a classic queue will terminate the process but may return an error to the caller if the record can't be removed from the metadata store before the timeout. (cherry picked from commit 2302eb9)
1 parent 5531394 commit aa778aa

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,6 +1669,11 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
16691669
{error, {exit, _, _}} ->
16701670
%% delete()/delegate:invoke might return {error, {exit, _, _}}
16711671
{ok, 0};
1672+
{error, timeout} ->
1673+
rabbit_misc:protocol_error(
1674+
internal_error,
1675+
"The operation to delete the queue from the metadata store "
1676+
"timed out", []);
16721677
{ok, Count} ->
16731678
{ok, Count};
16741679
{protocol_error, Type, Reason, ReasonArgs} ->

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
297297
end, State);
298298
terminate({shutdown, missing_owner = Reason}, {{reply_to, From}, #q{q = Q} = State}) ->
299299
%% if the owner was missing then there will be no queue, so don't emit stats
300-
State1 = terminate_shutdown(terminate_delete(false, Reason, State), State),
300+
State1 = terminate_shutdown(terminate_delete(false, Reason, none, State), State),
301301
send_reply(From, {owner_died, Q}),
302302
State1;
303303
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
@@ -310,18 +310,22 @@ terminate(normal, State = #q{status = {terminated_by, auto_delete}}) ->
310310
%% thousands of queues. A optimisation introduced by server#1513
311311
%% needs to be reverted by this case, avoiding to guard the delete
312312
%% operation on `rabbit_durable_queue`
313-
terminate_shutdown(terminate_delete(true, auto_delete, State), State);
314-
terminate(normal, State) -> %% delete case
315-
terminate_shutdown(terminate_delete(true, normal, State), State);
313+
terminate_shutdown(terminate_delete(true, auto_delete, none, State), State);
314+
terminate(normal, {{reply_to, ReplyTo}, State}) -> %% delete case
315+
terminate_shutdown(terminate_delete(true, normal, ReplyTo, State), State);
316+
terminate(normal, State) ->
317+
terminate_shutdown(terminate_delete(true, normal, none, State), State);
316318
%% If we crashed don't try to clean up the BQS, probably best to leave it.
317319
terminate(_Reason, State = #q{q = Q}) ->
318320
terminate_shutdown(fun (BQS) ->
319321
Q2 = amqqueue:set_state(Q, crashed),
322+
%% When mnesia is removed this update can become
323+
%% an async Khepri command.
320324
_ = rabbit_amqqueue:store_queue(Q2),
321325
BQS
322326
end, State).
323327

324-
terminate_delete(EmitStats, Reason0,
328+
terminate_delete(EmitStats, Reason0, ReplyTo,
325329
State = #q{q = Q,
326330
backing_queue = BQ,
327331
status = Status}) ->
@@ -332,19 +336,24 @@ terminate_delete(EmitStats, Reason0,
332336
missing_owner -> normal;
333337
Any -> Any
334338
end,
339+
Len = BQ:len(BQS),
335340
BQS1 = BQ:delete_and_terminate(Reason, BQS),
336341
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
337342
fun() -> emit_stats(State) end);
338343
true -> ok
339344
end,
340345
%% This try-catch block transforms throws to errors since throws are not
341-
%% logged.
342-
try
343-
%% don't care if the internal delete doesn't return 'ok'.
344-
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0)
345-
catch
346-
{error, ReasonE} -> error(ReasonE)
347-
end,
346+
%% logged. When mnesia is removed this `try` can be removed: Khepri
347+
%% returns errors as error tuples instead.
348+
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
349+
ok ->
350+
{ok, Len};
351+
{error, _} = Err ->
352+
Err
353+
catch
354+
{error, ReasonE} -> error(ReasonE)
355+
end,
356+
send_reply(ReplyTo, Reply),
348357
BQS1
349358
end.
350359

@@ -1396,15 +1405,16 @@ handle_call(stat, _From, State) ->
13961405
ensure_expiry_timer(State),
13971406
reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
13981407

1399-
handle_call({delete, IfUnused, IfEmpty, ActingUser}, _From,
1408+
handle_call({delete, IfUnused, IfEmpty, ActingUser}, From,
14001409
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
14011410
IsEmpty = BQ:is_empty(BQS),
14021411
IsUnused = is_unused(State),
14031412
if
14041413
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
14051414
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
1406-
true -> stop({ok, BQ:len(BQS)},
1407-
State#q{status = {terminated_by, ActingUser}})
1415+
true ->
1416+
State1 = State#q{status = {terminated_by, ActingUser}},
1417+
stop({{reply_to, From}, State1})
14081418
end;
14091419

14101420
handle_call(purge, _From, State = #q{backing_queue = BQ,

0 commit comments

Comments
 (0)