Skip to content

Commit ae9a521

Browse files
author
Tim Watson
committed
merging heads
2 parents 4282f62 + 32f7796 commit ae9a521

8 files changed

+113
-75
lines changed

src/rabbit_amqqueue_process.erl

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
msg_id_to_channel,
4848
ttl,
4949
ttl_timer_ref,
50+
ttl_timer_expiry,
5051
senders,
5152
publish_seqno,
5253
unconfirmed,
@@ -559,7 +560,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
559560
maybe_record_confirm_message(Confirm, State1),
560561
Props = message_properties(Confirm, State2),
561562
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
562-
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
563+
ensure_ttl_timer(Props#message_properties.expiry,
564+
State2#q{backing_queue_state = BQS1})
563565
end.
564566

565567
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -699,28 +701,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
699701
backing_queue = BQ }) ->
700702
Now = now_micros(),
701703
DLXFun = dead_letter_fun(expired, State),
702-
ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
703-
case DLXFun of
704-
undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
705-
BQS1;
706-
_ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
707-
lists:foreach(
708-
fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
709-
BQS1
710-
end,
711-
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
712-
713-
ensure_ttl_timer(State = #q{backing_queue = BQ,
714-
backing_queue_state = BQS,
715-
ttl = TTL,
716-
ttl_timer_ref = undefined})
717-
when TTL =/= undefined ->
718-
case BQ:is_empty(BQS) of
719-
true -> State;
720-
false -> TRef = erlang:send_after(TTL, self(), drop_expired),
721-
State#q{ttl_timer_ref = TRef}
704+
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
705+
{Props, BQS1} =
706+
case DLXFun of
707+
undefined ->
708+
{Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
709+
{Next, BQS2};
710+
_ ->
711+
{Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
712+
lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
713+
Msgs),
714+
{Next, BQS2}
715+
end,
716+
ensure_ttl_timer(case Props of
717+
undefined -> undefined;
718+
#message_properties{expiry = Exp} -> Exp
719+
end, State#q{backing_queue_state = BQS1}).
720+
721+
ensure_ttl_timer(undefined, State) ->
722+
State;
723+
ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
724+
State;
725+
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
726+
After = (case Expiry - now_micros() of
727+
V when V > 0 -> V + 999; %% always fire later
728+
_ -> 0
729+
end) div 1000,
730+
TRef = erlang:send_after(After, self(), drop_expired),
731+
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
732+
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
733+
ttl_timer_expiry = TExpiry})
734+
when Expiry + 1000 < TExpiry ->
735+
case erlang:cancel_timer(TRef) of
736+
false -> State;
737+
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
722738
end;
723-
ensure_ttl_timer(State) ->
739+
ensure_ttl_timer(_Expiry, State) ->
724740
State.
725741

726742
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,

src/rabbit_backing_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,11 @@
123123
%% necessitate an ack or not. If they do, the function returns a list of
124124
%% messages with the respective acktags.
125125
-callback dropwhile(msg_pred(), true, state())
126-
-> {[{rabbit_types:basic_message(), ack()}], state()};
126+
-> {rabbit_types:message_properties() | undefined,
127+
[{rabbit_types:basic_message(), ack()}], state()};
127128
(msg_pred(), false, state())
128-
-> {undefined, state()}.
129+
-> {rabbit_types:message_properties() | undefined,
130+
undefined, state()}.
129131

130132
%% Produce the next message.
131133
-callback fetch(true, state()) -> {fetch_result(ack()), state()};

src/rabbit_backing_queue_qc.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
268268
S#state{bqstate = BQ1};
269269

270270
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
271-
BQ = {call, erlang, element, [2, Res]},
271+
BQ = {call, erlang, element, [3, Res]},
272272
#state{messages = Messages} = S,
273273
Msgs1 = drop_messages(Messages),
274274
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};

src/rabbit_mirror_queue_master.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired,
185185
set_delivered = SetDelivered,
186186
backing_queue_state = BQS }) ->
187187
Len = BQ:len(BQS),
188-
{Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
188+
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
189189
Len1 = BQ:len(BQS1),
190190
ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
191191
Dropped = Len - Len1,
192192
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
193-
{Msgs, State #state { backing_queue_state = BQS1,
194-
set_delivered = SetDelivered1 } }.
193+
{Next, Msgs, State #state { backing_queue_state = BQS1,
194+
set_delivered = SetDelivered1 } }.
195195

196196
drain_confirmed(State = #state { backing_queue = BQ,
197197
backing_queue_state = BQS,

src/rabbit_mnesia.erl

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -726,40 +726,42 @@ reset(Force) ->
726726
end]),
727727
ensure_mnesia_not_running(),
728728
case not Force andalso is_clustered() andalso
729-
is_only_disc_node(node(), false)
729+
is_only_disc_node(node(), false)
730730
of
731731
true -> log_both("no other disc nodes running");
732732
false -> ok
733733
end,
734-
Node = node(),
735-
Nodes = all_clustered_nodes() -- [Node],
736734
case Force of
737-
true -> ok;
735+
true ->
736+
disconnect_nodes(nodes());
738737
false ->
739738
ensure_mnesia_dir(),
740739
start_mnesia(),
741-
RunningNodes =
740+
{Nodes, RunningNodes} =
742741
try
743742
%% Force=true here so that reset still works when clustered
744743
%% with a node which is down
745744
ok = init_db(read_cluster_nodes_config(), true),
746-
running_clustered_nodes() -- [Node]
745+
{all_clustered_nodes() -- [node()],
746+
running_clustered_nodes() -- [node()]}
747747
after
748748
stop_mnesia()
749749
end,
750750
leave_cluster(Nodes, RunningNodes),
751-
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
752-
cannot_delete_schema)
751+
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
752+
cannot_delete_schema),
753+
disconnect_nodes(Nodes)
753754
end,
754-
%% We need to make sure that we don't end up in a distributed
755-
%% Erlang system with nodes while not being in an Mnesia cluster
756-
%% with them. We don't handle that well.
757-
[erlang:disconnect_node(N) || N <- Nodes],
758755
ok = delete_cluster_nodes_config(),
759756
%% remove persisted messages and any other garbage we find
760757
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
761758
ok.
762759

760+
%% We need to make sure that we don't end up in a distributed Erlang
761+
%% system with nodes while not being in an Mnesia cluster with
762+
%% them. We don't handle that well.
763+
disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
764+
763765
leave_cluster([], _) -> ok;
764766
leave_cluster(Nodes, RunningNodes) ->
765767
%% find at least one running cluster node and instruct it to

src/rabbit_tests.erl

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,10 @@ maybe_run_cluster_dependent_tests() ->
7272
run_cluster_dependent_tests(SecondaryNode) ->
7373
SecondaryNodeS = atom_to_list(SecondaryNode),
7474

75-
cover:stop(SecondaryNode),
7675
ok = control_action(stop_app, []),
77-
ok = control_action(reset, []),
76+
ok = safe_reset(),
7877
ok = control_action(cluster, [SecondaryNodeS]),
7978
ok = control_action(start_app, []),
80-
cover:start(SecondaryNode),
8179
ok = control_action(start_app, SecondaryNode, [], []),
8280

8381
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
@@ -908,7 +906,7 @@ test_cluster_management2(SecondaryNode) ->
908906
ok = assert_ram_node(),
909907

910908
%% join cluster as a ram node
911-
ok = control_action(reset, []),
909+
ok = safe_reset(),
912910
ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
913911
ok = control_action(start_app, []),
914912
ok = control_action(stop_app, []),
@@ -965,29 +963,30 @@ test_cluster_management2(SecondaryNode) ->
965963
ok = assert_disc_node(),
966964

967965
%% turn a disk node into a ram node
968-
ok = control_action(reset, []),
966+
%%
967+
%% can't use safe_reset here since for some reason nodes()==[] and
968+
%% yet w/o stopping coverage things break
969+
with_suspended_cover(
970+
[SecondaryNode], fun () -> ok = control_action(reset, []) end),
969971
ok = control_action(cluster, [SecondaryNodeS]),
970972
ok = control_action(start_app, []),
971973
ok = control_action(stop_app, []),
972974
ok = assert_ram_node(),
973975

974976
%% NB: this will log an inconsistent_database error, which is harmless
975-
%% Turning cover on / off is OK even if we're not in general using cover,
976-
%% it just turns the engine on / off, doesn't actually log anything.
977-
cover:stop([SecondaryNode]),
978-
true = disconnect_node(SecondaryNode),
979-
pong = net_adm:ping(SecondaryNode),
980-
cover:start([SecondaryNode]),
977+
with_suspended_cover(
978+
[SecondaryNode], fun () ->
979+
true = disconnect_node(SecondaryNode),
980+
pong = net_adm:ping(SecondaryNode)
981+
end),
981982

982983
%% leaving a cluster as a ram node
983-
ok = control_action(reset, []),
984+
ok = safe_reset(),
984985
%% ...and as a disk node
985986
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
986987
ok = control_action(start_app, []),
987988
ok = control_action(stop_app, []),
988-
cover:stop(SecondaryNode),
989-
ok = control_action(reset, []),
990-
cover:start(SecondaryNode),
989+
ok = safe_reset(),
991990

992991
%% attempt to leave cluster when no other node is alive
993992
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -1002,22 +1001,39 @@ test_cluster_management2(SecondaryNode) ->
10021001
control_action(cluster, [SecondaryNodeS]),
10031002

10041003
%% leave system clustered, with the secondary node as a ram node
1005-
ok = control_action(force_reset, []),
1004+
with_suspended_cover(
1005+
[SecondaryNode], fun () -> ok = control_action(force_reset, []) end),
10061006
ok = control_action(start_app, []),
10071007
%% Yes, this is rather ugly. But since we're a clustered Mnesia
10081008
%% node and we're telling another clustered node to reset itself,
10091009
%% we will get disconnected half way through causing a
10101010
%% badrpc. This never happens in real life since rabbitmqctl is
1011-
%% not a clustered Mnesia node.
1012-
cover:stop(SecondaryNode),
1013-
{badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
1014-
pong = net_adm:ping(SecondaryNode),
1015-
cover:start(SecondaryNode),
1011+
%% not a clustered Mnesia node and is a hidden node.
1012+
with_suspended_cover(
1013+
[SecondaryNode],
1014+
fun () ->
1015+
{badrpc, nodedown} =
1016+
control_action(force_reset, SecondaryNode, [], []),
1017+
pong = net_adm:ping(SecondaryNode)
1018+
end),
10161019
ok = control_action(cluster, SecondaryNode, [NodeS], []),
10171020
ok = control_action(start_app, SecondaryNode, [], []),
10181021

10191022
passed.
10201023

1024+
%% 'cover' does not cope at all well with nodes disconnecting, which
1025+
%% happens as part of reset. So we turn it off temporarily. That is ok
1026+
%% even if we're not in general using cover, it just turns the engine
1027+
%% on / off and doesn't log anything.
1028+
safe_reset() -> with_suspended_cover(
1029+
nodes(), fun () -> control_action(reset, []) end).
1030+
1031+
with_suspended_cover(Nodes, Fun) ->
1032+
cover:stop(Nodes),
1033+
Res = Fun(),
1034+
cover:start(Nodes),
1035+
Res.
1036+
10211037
test_user_management() ->
10221038

10231039
%% lots if stuff that should fail
@@ -2388,10 +2404,10 @@ test_dropwhile(VQ0) ->
23882404
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
23892405

23902406
%% drop the first 5 messages
2391-
{undefined, VQ2} = rabbit_variable_queue:dropwhile(
2392-
fun(#message_properties { expiry = Expiry }) ->
2393-
Expiry =< 5
2394-
end, false, VQ1),
2407+
{_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
2408+
fun(#message_properties { expiry = Expiry }) ->
2409+
Expiry =< 5
2410+
end, false, VQ1),
23952411

23962412
%% fetch five now
23972413
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2408,11 +2424,11 @@ test_dropwhile(VQ0) ->
24082424
test_dropwhile_varying_ram_duration(VQ0) ->
24092425
VQ1 = variable_queue_publish(false, 1, VQ0),
24102426
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
2411-
{undefined, VQ3} = rabbit_variable_queue:dropwhile(
2412-
fun(_) -> false end, false, VQ2),
2427+
{_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
2428+
fun(_) -> false end, false, VQ2),
24132429
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
24142430
VQ5 = variable_queue_publish(false, 1, VQ4),
2415-
{undefined, VQ6} =
2431+
{_, undefined, VQ6} =
24162432
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
24172433
VQ6.
24182434

src/rabbit_variable_queue.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -581,12 +581,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
581581
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
582582

583583
dropwhile(Pred, AckRequired, State, Msgs) ->
584-
End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
585-
(S) -> {undefined, S}
584+
End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
585+
(Next, S) -> {Next, undefined, S}
586586
end,
587587
case queue_out(State) of
588588
{empty, State1} ->
589-
End(a(State1));
589+
End(undefined, a(State1));
590590
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
591591
case {Pred(MsgProps), AckRequired} of
592592
{true, true} ->
@@ -598,7 +598,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
598598
{_, State2} = internal_fetch(false, MsgStatus, State1),
599599
dropwhile(Pred, AckRequired, State2, undefined);
600600
{false, _} ->
601-
End(a(in_r(MsgStatus, State1)))
601+
End(MsgProps, a(in_r(MsgStatus, State1)))
602602
end
603603
end.
604604

src/vm_memory_monitor.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
-record(state, {total_memory,
5151
memory_limit,
52+
memory_fraction,
5253
timeout,
5354
timer,
5455
alarmed
@@ -110,7 +111,7 @@ init([MemFraction]) ->
110111
{ok, set_mem_limits(State, MemFraction)}.
111112

112113
handle_call(get_vm_memory_high_watermark, _From, State) ->
113-
{reply, State#state.memory_limit / State#state.total_memory, State};
114+
{reply, State#state.memory_fraction, State};
114115

115116
handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
116117
State1 = set_mem_limits(State, MemFraction),
@@ -171,8 +172,9 @@ set_mem_limits(State, MemFraction) ->
171172
MemLim = get_mem_limit(MemFraction, TotalMemory),
172173
error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
173174
[trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),
174-
internal_update(State #state { total_memory = TotalMemory,
175-
memory_limit = MemLim }).
175+
internal_update(State #state { total_memory = TotalMemory,
176+
memory_limit = MemLim,
177+
memory_fraction = MemFraction}).
176178

177179
internal_update(State = #state { memory_limit = MemLimit,
178180
alarmed = Alarmed}) ->

0 commit comments

Comments
 (0)