Skip to content

Commit 9b3d19f

Browse files
author
Francesco Mazzoli
committed
merge emile qc changes
2 parents ec82e9b + 72c55d7 commit 9b3d19f

File tree

4 files changed

+21
-19
lines changed

4 files changed

+21
-19
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -696,14 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
696696
drop_expired_messages(State = #q{ttl = undefined}) ->
697697
State;
698698
drop_expired_messages(State = #q{backing_queue_state = BQS,
699-
backing_queue = BQ}) ->
699+
backing_queue = BQ }) ->
700700
Now = now_micros(),
701-
{Msgs, BQS1} =
702-
BQ:dropwhile(
703-
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
704-
true, BQS),
705701
DLXFun = dead_letter_fun(expired, State),
706-
lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
702+
ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
703+
case DLXFun of
704+
undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
705+
BQS1;
706+
_ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
707+
lists:foreach(
708+
fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
709+
BQS1
710+
end,
707711
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
708712

709713
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -720,9 +724,7 @@ ensure_ttl_timer(State) ->
720724
State.
721725

722726
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
723-
fun(_Msg, _AckTag) ->
724-
ok
725-
end;
727+
undefined;
726728
dead_letter_fun(Reason, _State) ->
727729
fun(Msg, AckTag) ->
728730
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})

src/rabbit_backing_queue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@
120120

121121
%% Drop messages from the head of the queue while the supplied predicate returns
122122
%% true. Also accepts a boolean parameter that determines whether the messages
123-
%% are to be acked or not. If they are, the messages and the acktags are
124-
%% returned.
123+
%% necessitate an ack or not. If they do, the function returns a list of
124+
%% messages with the respective acktags.
125125
-callback dropwhile(msg_pred(), true, state())
126126
-> {[{rabbit_types:basic_message(), ack()}], state()};
127127
(msg_pred(), false, state())

src/rabbit_mirror_queue_master.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
168168
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
169169
ack_msg_id = AM1 })}.
170170

171-
dropwhile(Pred, AckMsgs,
171+
dropwhile(Pred, AckRequired,
172172
State = #state{gm = GM,
173173
backing_queue = BQ,
174174
set_delivered = SetDelivered,
175175
backing_queue_state = BQS }) ->
176176
Len = BQ:len(BQS),
177-
{Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS),
177+
{Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
178178
Len1 = BQ:len(BQS1),
179179
ok = gm:broadcast(GM, {set_length, Len1}),
180180
Dropped = Len - Len1,

src/rabbit_variable_queue.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -578,24 +578,24 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
578578
confirmed = gb_sets:new() }}
579579
end.
580580

581-
dropwhile(Pred, AckMsgs, State) ->
582-
End = fun(S) when AckMsgs -> {[], S};
583-
(S) -> {undefined, S}
581+
dropwhile(Pred, AckRequired, State) ->
582+
End = fun(S) when AckRequired -> {[], S};
583+
(S) -> {undefined, S}
584584
end,
585585
case queue_out(State) of
586586
{empty, State1} ->
587587
End(a(State1));
588588
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
589-
case {Pred(MsgProps), AckMsgs} of
589+
case {Pred(MsgProps), AckRequired} of
590590
{true, true} ->
591591
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
592592
{{Msg, _, AckTag, _}, State3} =
593593
internal_fetch(true, MsgStatus1, State2),
594-
{L, State4} = dropwhile(Pred, AckMsgs, State3),
594+
{L, State4} = dropwhile(Pred, AckRequired, State3),
595595
{[{Msg, AckTag} | L], State4};
596596
{true, false} ->
597597
{_, State2} = internal_fetch(false, MsgStatus, State1),
598-
dropwhile(Pred, AckMsgs, State2);
598+
dropwhile(Pred, AckRequired, State2);
599599
{false, _} ->
600600
End(a(in_r(MsgStatus, State1)))
601601
end

0 commit comments

Comments
 (0)