Skip to content

Commit 32f7796

Browse files
author
Tim Watson
committed
Backport (Merge of bug25097; messages expire too late in absence of consumers)
1 parent 80e817f commit 32f7796

File tree

6 files changed

+57
-39
lines changed

6 files changed

+57
-39
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
msg_id_to_channel,
4848
ttl,
4949
ttl_timer_ref,
50+
ttl_timer_expiry,
5051
senders,
5152
publish_seqno,
5253
unconfirmed,
@@ -559,7 +560,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
559560
maybe_record_confirm_message(Confirm, State1),
560561
Props = message_properties(Confirm, State2),
561562
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
562-
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
563+
ensure_ttl_timer(Props#message_properties.expiry,
564+
State2#q{backing_queue_state = BQS1})
563565
end.
564566

565567
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -699,28 +701,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
699701
backing_queue = BQ }) ->
700702
Now = now_micros(),
701703
DLXFun = dead_letter_fun(expired, State),
702-
ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
703-
case DLXFun of
704-
undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
705-
BQS1;
706-
_ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
707-
lists:foreach(
708-
fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
709-
BQS1
710-
end,
711-
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
712-
713-
ensure_ttl_timer(State = #q{backing_queue = BQ,
714-
backing_queue_state = BQS,
715-
ttl = TTL,
716-
ttl_timer_ref = undefined})
717-
when TTL =/= undefined ->
718-
case BQ:is_empty(BQS) of
719-
true -> State;
720-
false -> TRef = erlang:send_after(TTL, self(), drop_expired),
721-
State#q{ttl_timer_ref = TRef}
704+
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
705+
{Props, BQS1} =
706+
case DLXFun of
707+
undefined ->
708+
{Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
709+
{Next, BQS2};
710+
_ ->
711+
{Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
712+
lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
713+
Msgs),
714+
{Next, BQS2}
715+
end,
716+
ensure_ttl_timer(case Props of
717+
undefined -> undefined;
718+
#message_properties{expiry = Exp} -> Exp
719+
end, State#q{backing_queue_state = BQS1}).
720+
721+
ensure_ttl_timer(undefined, State) ->
722+
State;
723+
ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
724+
State;
725+
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
726+
After = (case Expiry - now_micros() of
727+
V when V > 0 -> V + 999; %% always fire later
728+
_ -> 0
729+
end) div 1000,
730+
TRef = erlang:send_after(After, self(), drop_expired),
731+
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
732+
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
733+
ttl_timer_expiry = TExpiry})
734+
when Expiry + 1000 < TExpiry ->
735+
case erlang:cancel_timer(TRef) of
736+
false -> State;
737+
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
722738
end;
723-
ensure_ttl_timer(State) ->
739+
ensure_ttl_timer(_Expiry, State) ->
724740
State.
725741

726742
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,

src/rabbit_backing_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,11 @@
123123
%% necessitate an ack or not. If they do, the function returns a list of
124124
%% messages with the respective acktags.
125125
-callback dropwhile(msg_pred(), true, state())
126-
-> {[{rabbit_types:basic_message(), ack()}], state()};
126+
-> {rabbit_types:message_properties() | undefined,
127+
[{rabbit_types:basic_message(), ack()}], state()};
127128
(msg_pred(), false, state())
128-
-> {undefined, state()}.
129+
-> {rabbit_types:message_properties() | undefined,
130+
undefined, state()}.
129131

130132
%% Produce the next message.
131133
-callback fetch(true, state()) -> {fetch_result(ack()), state()};

src/rabbit_backing_queue_qc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
268268
S#state{bqstate = BQ1};
269269

270270
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
271-
BQ = {call, erlang, element, [2, Res]},
271+
BQ = {call, erlang, element, [3, Res]},
272272
#state{messages = Messages} = S,
273273
Msgs1 = drop_messages(Messages),
274274
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};

src/rabbit_mirror_queue_master.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired,
185185
set_delivered = SetDelivered,
186186
backing_queue_state = BQS }) ->
187187
Len = BQ:len(BQS),
188-
{Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
188+
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
189189
Len1 = BQ:len(BQS1),
190190
ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
191191
Dropped = Len - Len1,
192192
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
193-
{Msgs, State #state { backing_queue_state = BQS1,
194-
set_delivered = SetDelivered1 } }.
193+
{Next, Msgs, State #state { backing_queue_state = BQS1,
194+
set_delivered = SetDelivered1 } }.
195195

196196
drain_confirmed(State = #state { backing_queue = BQ,
197197
backing_queue_state = BQS,

src/rabbit_tests.erl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,10 +2404,10 @@ test_dropwhile(VQ0) ->
24042404
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
24052405

24062406
%% drop the first 5 messages
2407-
{undefined, VQ2} = rabbit_variable_queue:dropwhile(
2408-
fun(#message_properties { expiry = Expiry }) ->
2409-
Expiry =< 5
2410-
end, false, VQ1),
2407+
{_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
2408+
fun(#message_properties { expiry = Expiry }) ->
2409+
Expiry =< 5
2410+
end, false, VQ1),
24112411

24122412
%% fetch five now
24132413
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2424,11 +2424,11 @@ test_dropwhile(VQ0) ->
24242424
test_dropwhile_varying_ram_duration(VQ0) ->
24252425
VQ1 = variable_queue_publish(false, 1, VQ0),
24262426
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
2427-
{undefined, VQ3} = rabbit_variable_queue:dropwhile(
2428-
fun(_) -> false end, false, VQ2),
2427+
{_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
2428+
fun(_) -> false end, false, VQ2),
24292429
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
24302430
VQ5 = variable_queue_publish(false, 1, VQ4),
2431-
{undefined, VQ6} =
2431+
{_, undefined, VQ6} =
24322432
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
24332433
VQ6.
24342434

src/rabbit_variable_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -581,12 +581,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
581581
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
582582

583583
dropwhile(Pred, AckRequired, State, Msgs) ->
584-
End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
585-
(S) -> {undefined, S}
584+
End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
585+
(Next, S) -> {Next, undefined, S}
586586
end,
587587
case queue_out(State) of
588588
{empty, State1} ->
589-
End(a(State1));
589+
End(undefined, a(State1));
590590
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
591591
case {Pred(MsgProps), AckRequired} of
592592
{true, true} ->
@@ -598,7 +598,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
598598
{_, State2} = internal_fetch(false, MsgStatus, State1),
599599
dropwhile(Pred, AckRequired, State2, undefined);
600600
{false, _} ->
601-
End(a(in_r(MsgStatus, State1)))
601+
End(MsgProps, a(in_r(MsgStatus, State1)))
602602
end
603603
end.
604604

0 commit comments

Comments
 (0)