Skip to content

Commit 2302eb9

Browse files
committed
Handle rabbit_amqqueue:internal_delete/3 failures in classic queues
The design of `rabbit_amqqueue_process` makes this change challenging. The old implementation of the handler of the `{delete,_,_,_}` command simply stopped the process and any cleanup was done in `gen_server2`'s `terminate` callback. This makes it impossible to pass any error back to the caller if the record can't be deleted from the metadata store before a timeout. The strategy taken here slightly mirrors an existing `{shutdown, missing_owner}` termination value which can be returned from `init_it2/3`. We pass the `ReplyTo` for the call with the state. We then optionally reply to this `ReplyTo` if it is set in `terminate_delete/4` with the result of `rabbit_amqqueue:internal_delete/3`. So deletion of a classic queue will terminate the process but may return an error to the caller if the record can't be removed from the metadata store before the timeout.
1 parent a7d099d commit 2302eb9

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1662,6 +1662,11 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
16621662
{error, {exit, _, _}} ->
16631663
%% delete()/delegate:invoke might return {error, {exit, _, _}}
16641664
{ok, 0};
1665+
{error, timeout} ->
1666+
rabbit_misc:protocol_error(
1667+
internal_error,
1668+
"The operation to delete the queue from the metadata store "
1669+
"timed out", []);
16651670
{ok, Count} ->
16661671
{ok, Count};
16671672
{protocol_error, Type, Reason, ReasonArgs} ->

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
297297
end, State);
298298
terminate({shutdown, missing_owner = Reason}, {{reply_to, From}, #q{q = Q} = State}) ->
299299
%% if the owner was missing then there will be no queue, so don't emit stats
300-
State1 = terminate_shutdown(terminate_delete(false, Reason, State), State),
300+
State1 = terminate_shutdown(terminate_delete(false, Reason, none, State), State),
301301
send_reply(From, {owner_died, Q}),
302302
State1;
303303
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
@@ -310,18 +310,22 @@ terminate(normal, State = #q{status = {terminated_by, auto_delete}}) ->
310310
%% thousands of queues. A optimisation introduced by server#1513
311311
%% needs to be reverted by this case, avoiding to guard the delete
312312
%% operation on `rabbit_durable_queue`
313-
terminate_shutdown(terminate_delete(true, auto_delete, State), State);
314-
terminate(normal, State) -> %% delete case
315-
terminate_shutdown(terminate_delete(true, normal, State), State);
313+
terminate_shutdown(terminate_delete(true, auto_delete, none, State), State);
314+
terminate(normal, {{reply_to, ReplyTo}, State}) -> %% delete case
315+
terminate_shutdown(terminate_delete(true, normal, ReplyTo, State), State);
316+
terminate(normal, State) ->
317+
terminate_shutdown(terminate_delete(true, normal, none, State), State);
316318
%% If we crashed don't try to clean up the BQS, probably best to leave it.
317319
terminate(_Reason, State = #q{q = Q}) ->
318320
terminate_shutdown(fun (BQS) ->
319321
Q2 = amqqueue:set_state(Q, crashed),
322+
%% When mnesia is removed this update can become
323+
%% an async Khepri command.
320324
_ = rabbit_amqqueue:store_queue(Q2),
321325
BQS
322326
end, State).
323327

324-
terminate_delete(EmitStats, Reason0,
328+
terminate_delete(EmitStats, Reason0, ReplyTo,
325329
State = #q{q = Q,
326330
backing_queue = BQ,
327331
status = Status}) ->
@@ -332,19 +336,24 @@ terminate_delete(EmitStats, Reason0,
332336
missing_owner -> normal;
333337
Any -> Any
334338
end,
339+
Len = BQ:len(BQS),
335340
BQS1 = BQ:delete_and_terminate(Reason, BQS),
336341
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
337342
fun() -> emit_stats(State) end);
338343
true -> ok
339344
end,
340345
%% This try-catch block transforms throws to errors since throws are not
341-
%% logged.
342-
try
343-
%% don't care if the internal delete doesn't return 'ok'.
344-
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0)
345-
catch
346-
{error, ReasonE} -> error(ReasonE)
347-
end,
346+
%% logged. When mnesia is removed this `try` can be removed: Khepri
347+
%% returns errors as error tuples instead.
348+
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
349+
ok ->
350+
{ok, Len};
351+
{error, _} = Err ->
352+
Err
353+
catch
354+
{error, ReasonE} -> error(ReasonE)
355+
end,
356+
send_reply(ReplyTo, Reply),
348357
BQS1
349358
end.
350359

@@ -1396,15 +1405,16 @@ handle_call(stat, _From, State) ->
13961405
ensure_expiry_timer(State),
13971406
reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);
13981407

1399-
handle_call({delete, IfUnused, IfEmpty, ActingUser}, _From,
1408+
handle_call({delete, IfUnused, IfEmpty, ActingUser}, From,
14001409
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
14011410
IsEmpty = BQ:is_empty(BQS),
14021411
IsUnused = is_unused(State),
14031412
if
14041413
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
14051414
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
1406-
true -> stop({ok, BQ:len(BQS)},
1407-
State#q{status = {terminated_by, ActingUser}})
1415+
true ->
1416+
State1 = State#q{status = {terminated_by, ActingUser}},
1417+
stop({{reply_to, From}, State1})
14081418
end;
14091419

14101420
handle_call(purge, _From, State = #q{backing_queue = BQ,

0 commit comments

Comments
 (0)