Skip to content

Commit 9cbda0e

Browse files
Merge pull request #12320 from rabbitmq/mergify/bp/v4.0.x/pr-12317
2 parents 83d0ea2 + a782a77 commit 9cbda0e

8 files changed

+56
-31
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,7 +1519,14 @@ notify_policy_changed(Q) when ?is_amqqueue(Q) ->
15191519

15201520
consumers(Q) when ?amqqueue_is_classic(Q) ->
15211521
QPid = amqqueue:get_pid(Q),
1522-
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
1522+
try
1523+
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]})
1524+
catch
1525+
exit:_ ->
1526+
%% The queue process exited during the call.
1527+
%% Note that `delegate:invoke/2' catches errors but not exits.
1528+
[]
1529+
end;
15231530
consumers(Q) when ?amqqueue_is_quorum(Q) ->
15241531
QPid = amqqueue:get_pid(Q),
15251532
case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of
@@ -1615,17 +1622,23 @@ delete_immediately_by_resource(Resources) ->
16151622
-spec delete
16161623
(amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
16171624
qlen() |
1625+
rabbit_types:error(timeout) |
16181626
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
16191627
(amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
1620-
qlen() | rabbit_types:error('in_use') |
1628+
qlen() |
1629+
rabbit_types:error('in_use') |
1630+
rabbit_types:error(timeout) |
16211631
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
16221632
(amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
1623-
qlen() | rabbit_types:error('not_empty') |
1633+
qlen() |
1634+
rabbit_types:error('not_empty') |
1635+
rabbit_types:error(timeout) |
16241636
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
16251637
(amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
16261638
qlen() |
16271639
rabbit_types:error('in_use') |
16281640
rabbit_types:error('not_empty') |
1641+
rabbit_types:error(timeout) |
16291642
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
16301643
delete(Q, IfUnused, IfEmpty, ActingUser) ->
16311644
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,8 @@ delete(Q0, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q0) ->
171171
case delete_crashed_internal(Q, ActingUser) of
172172
ok ->
173173
{ok, 0};
174-
{error, timeout} ->
175-
{error, protocol_error,
176-
"The operation to delete ~ts from the "
177-
"metadata store timed out",
178-
[rabbit_misc:rs(QName)]}
174+
{error, timeout} = Err ->
175+
Err
179176
end
180177
end
181178
end;

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ declare(Q0, Node) ->
383383
boolean(), rabbit_types:username()) ->
384384
rabbit_types:ok(non_neg_integer()) |
385385
rabbit_types:error(in_use | not_empty) |
386+
rabbit_types:error(timeout) |
386387
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
387388
delete(Q, IfUnused, IfEmpty, ActingUser) ->
388389
Mod = amqqueue:get_type(Q),

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -823,10 +823,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
823823
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
824824
?RPC_TIMEOUT),
825825
{ok, ReadyMsgs};
826-
{error, timeout} ->
827-
{protocol_error, internal_error,
828-
"The operation to delete ~ts from the metadata store "
829-
"timed out", [rabbit_misc:rs(QName)]}
826+
{error, timeout} = Err ->
827+
Err
830828
end;
831829
{error, {no_more_servers_to_try, Errs}} ->
832830
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -849,10 +847,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
849847
case delete_queue_data(Q, ActingUser) of
850848
ok ->
851849
{ok, ReadyMsgs};
852-
{error, timeout} ->
853-
{protocol_error, internal_error,
854-
"The operation to delete queue ~ts from the metadata "
855-
"store timed out", [rabbit_misc:rs(QName)]}
850+
{error, timeout} = Err ->
851+
Err
856852
end
857853
end.
858854

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,12 @@ delete_stream(Q, ActingUser)
189189
#{name := StreamId} = amqqueue:get_type_state(Q),
190190
case process_command({delete_stream, StreamId, #{}}) of
191191
{ok, ok, _} ->
192-
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
193-
{ok, {ok, 0}};
192+
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
193+
ok ->
194+
{ok, {ok, 0}};
195+
{error, timeout} = Err ->
196+
Err
197+
end;
194198
Err ->
195199
Err
196200
end.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,14 @@ create_stream(Q0) ->
214214
-spec delete(amqqueue:amqqueue(), boolean(),
215215
boolean(), rabbit_types:username()) ->
216216
rabbit_types:ok(non_neg_integer()) |
217-
rabbit_types:error(in_use | not_empty).
217+
rabbit_types:error(timeout) |
218+
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
218219
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
219220
case rabbit_stream_coordinator:delete_stream(Q, ActingUser) of
220221
{ok, Reply} ->
221222
Reply;
223+
{error, timeout} = Err ->
224+
Err;
222225
Error ->
223226
{protocol_error, internal_error, "Cannot delete ~ts on node '~ts': ~255p ",
224227
[rabbit_misc:rs(amqqueue:get_name(Q)), node(), Error]}

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,9 @@ handle_clean_start(_, QoS, State = #state{cfg = #cfg{clean_start = true}}) ->
769769
ok ->
770770
{ok, SessPresent, State};
771771
{error, access_refused} ->
772-
{error, ?RC_NOT_AUTHORIZED}
772+
{error, ?RC_NOT_AUTHORIZED};
773+
{error, _Reason} ->
774+
{error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
773775
end
774776
end;
775777
handle_clean_start(SessPresent, QoS,
@@ -991,7 +993,8 @@ clear_will_msg(#state{cfg = #cfg{vhost = Vhost,
991993
QName = #resource{virtual_host = Vhost, kind = queue, name = QNameBin},
992994
case delete_queue(QName, State) of
993995
ok -> ok;
994-
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED}
996+
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED};
997+
{error, _Reason} -> {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
995998
end.
996999

9971000
make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
@@ -1323,8 +1326,10 @@ ensure_queue(QoS, State) ->
13231326
case delete_queue(QName, State) of
13241327
ok ->
13251328
create_queue(QoS, State);
1326-
{error, access_refused} = E ->
1327-
E
1329+
{error, _} = Err ->
1330+
Err;
1331+
{protocol_error, _, _, _} = Err ->
1332+
{error, Err}
13281333
end;
13291334
{error, not_found} ->
13301335
create_queue(QoS, State)
@@ -1829,7 +1834,10 @@ maybe_delete_mqtt_qos0_queue(_) ->
18291834
ok.
18301835

18311836
-spec delete_queue(rabbit_amqqueue:name(), state()) ->
1832-
ok | {error, access_refused}.
1837+
ok |
1838+
{error, access_refused} |
1839+
{error, timeout} |
1840+
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
18331841
delete_queue(QName,
18341842
#state{auth_state = #auth_state{
18351843
user = User = #user{username = Username},
@@ -1841,8 +1849,12 @@ delete_queue(QName,
18411849
fun (Q) ->
18421850
case check_resource_access(User, QName, configure, AuthzCtx) of
18431851
ok ->
1844-
{ok, _N} = rabbit_queue_type:delete(Q, false, false, Username),
1845-
ok;
1852+
case rabbit_queue_type:delete(Q, false, false, Username) of
1853+
{ok, _} ->
1854+
ok;
1855+
Err ->
1856+
Err
1857+
end;
18461858
Err ->
18471859
Err
18481860
end

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,17 +109,16 @@ declare(Q0, _Node) ->
109109
boolean(),
110110
boolean(),
111111
rabbit_types:username()) ->
112-
rabbit_types:ok(non_neg_integer()).
112+
rabbit_types:ok(non_neg_integer()) |
113+
rabbit_types:error(timeout).
113114
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
114115
QName = amqqueue:get_name(Q),
115116
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
116117
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
117118
ok ->
118119
{ok, 0};
119-
{error, timeout} ->
120-
{protocol_error, internal_error,
121-
"The operation to delete ~ts from the metadata store timed "
122-
"out", [rabbit_misc:rs(QName)]}
120+
{error, timeout} = Err ->
121+
Err
123122
end.
124123

125124
-spec deliver([{amqqueue:amqqueue(), stateless}],

0 commit comments

Comments
 (0)