Skip to content

Commit ab66dd0

Browse files
committed
worker_pool
1 parent ad3fdfa commit ab66dd0

File tree

1 file changed

+22
-18
lines changed

1 file changed

+22
-18
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -354,17 +354,11 @@ terminate_delete(EmitStats, Reason0, ReplyTo,
354354
end,
355355
case ReplyTo of
356356
_ when ReplyTo =/= none ->
357-
%% This try-catch block transforms throws to errors since
358-
%% throws are not logged. When mnesia is removed this `try`
359-
%% can be removed: Khepri returns errors as error tuples
360-
%% instead.
361-
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
357+
Reply = case delete_queue_record(Q, ActingUser, Reason0) of
362358
ok ->
363359
{ok, Len};
364360
{error, _} = Err ->
365361
Err
366-
catch
367-
{error, ReasonE} -> error(ReasonE)
368362
end,
369363
send_reply(ReplyTo, Reply);
370364
none ->
@@ -385,28 +379,38 @@ terminate_delete(EmitStats, Reason0, ReplyTo,
385379
%% with the `node_up' message from `rabbit_node_monitor'. This
386380
%% calls `rabbit_amqqueue:on_node_up/1' which will delete any
387381
%% transient queues.
388-
_ = infinite_internal_delete(Q, ActingUser, Reason0),
382+
%%
383+
%% This infinite delete attempts loop is executed in a
384+
%% separate process to let this queue process exits. This way,
385+
%% connections will be notified that the queue process is
386+
%% gone.
387+
worker_pool:submit_async(
388+
fun() ->
389+
_ = infinite_internal_delete(Q, ActingUser, Reason0)
390+
end),
389391
ok
390392
end,
391393
BQS1
392394
end.
393395

394396
infinite_internal_delete(Q, ActingUser, Reason) ->
395-
%% This try-catch block transforms throws to errors since throws are not
396-
%% logged. When mnesia is removed this `try` can be removed: Khepri returns
397-
%% errors as error tuples instead.
398-
Ret = try
399-
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason)
400-
catch
401-
{error, ReasonE} -> error(ReasonE)
402-
end,
403-
case Ret of
397+
case delete_queue_record(Q, ActingUser, Reason) of
404398
{error, timeout} ->
405399
infinite_internal_delete(Q, ActingUser, Reason);
406-
_ ->
400+
Ret ->
407401
Ret
408402
end.
409403

404+
delete_queue_record(Q, ActingUser, Reason) ->
405+
%% This try-catch block transforms throws to errors since throws are not
406+
%% logged. When mnesia is removed this `try` can be removed: Khepri returns
407+
%% errors as error tuples instead.
408+
try
409+
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason)
410+
catch
411+
{error, ReasonE} -> error(ReasonE)
412+
end.
413+
410414
terminated_by({terminated_by, auto_delete}) ->
411415
?INTERNAL_USER;
412416
terminated_by({terminated_by, ActingUser}) ->

0 commit comments

Comments
 (0)