Skip to content

Commit f952433

Browse files
author
Matthias Radestock
committed
merge bug22221 into v1_7
2 parents 81dfdbf + 8eb6286 commit f952433

File tree

2 files changed

+29
-22
lines changed

2 files changed

+29
-22
lines changed

src/rabbit_channel.erl

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false},
526526
_, State = #ch{ transaction_id = none,
527527
writer_pid = WriterPid,
528528
unacked_message_q = UAMQ }) ->
529-
lists:foreach(
530-
fun ({_DeliveryTag, none, _Msg}) ->
531-
%% Was sent as a basic.get_ok. Don't redeliver
532-
%% it. FIXME: appropriate?
533-
ok;
534-
({DeliveryTag, ConsumerTag,
535-
{QName, QPid, MsgId, _Redelivered, Message}}) ->
536-
%% Was sent as a proper consumer delivery. Resend it as
537-
%% before.
538-
%%
539-
%% FIXME: What should happen if the consumer's been
540-
%% cancelled since?
541-
%%
542-
%% FIXME: should we allocate a fresh DeliveryTag?
543-
ok = internal_deliver(
529+
ok = rabbit_misc:queue_fold(
530+
fun ({_DeliveryTag, none, _Msg}, ok) ->
531+
%% Was sent as a basic.get_ok. Don't redeliver
532+
%% it. FIXME: appropriate?
533+
ok;
534+
({DeliveryTag, ConsumerTag,
535+
{QName, QPid, MsgId, _Redelivered, Message}}, ok) ->
536+
%% Was sent as a proper consumer delivery. Resend
537+
%% it as before.
538+
%%
539+
%% FIXME: What should happen if the consumer's been
540+
%% cancelled since?
541+
%%
542+
%% FIXME: should we allocate a fresh DeliveryTag?
543+
internal_deliver(
544544
WriterPid, false, ConsumerTag, DeliveryTag,
545545
{QName, QPid, MsgId, true, Message})
546-
end, queue:to_list(UAMQ)),
546+
end, ok, UAMQ),
547547
%% No answer required, apparently!
548548
{noreply, State};
549549

@@ -872,7 +872,7 @@ rollback_and_notify(State) ->
872872
notify_queues(internal_rollback(State)).
873873

874874
fold_per_queue(F, Acc0, UAQ) ->
875-
D = lists:foldl(
875+
D = rabbit_misc:queue_fold(
876876
fun ({_DTag, _CTag,
877877
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
878878
%% dict:append would be simpler and avoid the
@@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) ->
883883
fun (MsgIds) -> [MsgId | MsgIds] end,
884884
[MsgId],
885885
D)
886-
end, dict:new(), queue:to_list(UAQ)),
886+
end, dict:new(), UAQ),
887887
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
888888
Acc0, D).
889889

@@ -912,9 +912,9 @@ consumer_queues(Consumers) ->
912912
notify_limiter(undefined, _Acked) ->
913913
ok;
914914
notify_limiter(LimiterPid, Acked) ->
915-
case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
916-
({_, _, _}, Acc) -> Acc + 1
917-
end, 0, queue:to_list(Acked)) of
915+
case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
916+
({_, _, _}, Acc) -> Acc + 1
917+
end, 0, Acked) of
918918
0 -> ok;
919919
Count -> rabbit_limiter:ack(LimiterPid, Count)
920920
end.

src/rabbit_misc.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
-export([append_file/2, ensure_parent_dirs_exist/1]).
5656
-export([format_stderr/2]).
5757
-export([start_applications/1, stop_applications/1]).
58-
-export([unfold/2, ceil/1]).
58+
-export([unfold/2, ceil/1, queue_fold/3]).
5959

6060
-import(mnesia).
6161
-import(lists).
@@ -126,6 +126,7 @@
126126
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
127127
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
128128
-spec(ceil/1 :: (number()) -> number()).
129+
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
129130

130131
-endif.
131132

@@ -489,3 +490,9 @@ ceil(N) ->
489490
0 -> N;
490491
_ -> 1 + T
491492
end.
493+
494+
queue_fold(Fun, Init, Q) ->
495+
case queue:out(Q) of
496+
{empty, _Q} -> Init;
497+
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
498+
end.

0 commit comments

Comments
 (0)