Skip to content

Commit fcaa342

Browse files
Merge pull request #1849 from rabbitmq/basic_get_message_ready_count
Fix basic get message ready count
2 parents b75d2b3 + 649b8c9 commit fcaa342

File tree

4 files changed

+65
-51
lines changed

4 files changed

+65
-51
lines changed

src/rabbit_fifo.erl

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -404,21 +404,22 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
404404
_ when Exists ->
405405
%% a dequeue using the same consumer_id isn't possible at this point
406406
{State0, {dequeue, empty}};
407-
_ ->
407+
Ready ->
408408
State1 = update_consumer(ConsumerId, ConsumerMeta,
409409
{once, 1, simple_prefetch}, State0),
410410
{success, _, MsgId, Msg, State2} = checkout_one(State1),
411411
case Settlement of
412412
unsettled ->
413413
{_, Pid} = ConsumerId,
414-
{State2, {dequeue, {MsgId, Msg}},
414+
{State2, {dequeue, {MsgId, Msg}, Ready-1},
415415
[{monitor, process, Pid}]};
416416
settled ->
417417
%% immediately settle the checkout
418418
{State, _, Effects} = apply(Meta,
419-
make_settle(ConsumerId, [MsgId]),
419+
make_settle(ConsumerId,
420+
[MsgId]),
420421
State2),
421-
{State, {dequeue, {MsgId, Msg}}, Effects}
422+
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
422423
end
423424
end;
424425
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
@@ -793,9 +794,8 @@ query_single_active_consumer(#state{consumer_strategy = single_active,
793794
query_single_active_consumer(_) ->
794795
disabled.
795796

796-
query_stat(#state{messages = M,
797-
consumers = Consumers}) ->
798-
{maps:size(M), maps:size(Consumers)}.
797+
query_stat(#state{consumers = Consumers} = State) ->
798+
{messages_ready(State), maps:size(Consumers)}.
799799

800800
-spec usage(atom()) -> float().
801801
usage(Name) when is_atom(Name) ->
@@ -1666,7 +1666,8 @@ enq_enq_deq_test() ->
16661666
{State1, _} = enq(1, 1, first, test_init(test)),
16671667
{State2, _} = enq(2, 2, second, State1),
16681668
% get returns a reply value
1669-
{_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
1669+
NumReady = 1,
1670+
{_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
16701671
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
16711672
State2),
16721673
ok.
@@ -1676,7 +1677,7 @@ enq_enq_deq_deq_settle_test() ->
16761677
{State1, _} = enq(1, 1, first, test_init(test)),
16771678
{State2, _} = enq(2, 2, second, State1),
16781679
% get returns a reply value
1679-
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
1680+
{State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
16801681
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
16811682
State2),
16821683
{_State4, {dequeue, empty}} =
@@ -1688,7 +1689,7 @@ enq_enq_checkout_get_settled_test() ->
16881689
Cid = {?FUNCTION_NAME, self()},
16891690
{State1, _} = enq(1, 1, first, test_init(test)),
16901691
% get returns a reply value
1691-
{_State2, {dequeue, {0, {_, first}}}, _Effs} =
1692+
{_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
16921693
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
16931694
State1),
16941695
ok.
@@ -1706,7 +1707,7 @@ untracked_enq_deq_test() ->
17061707
{State1, _, _} = apply(meta(1),
17071708
make_enqueue(undefined, undefined, first),
17081709
State0),
1709-
{_State2, {dequeue, {0, {_, first}}}, _} =
1710+
{_State2, {dequeue, {0, {_, first}}, _}, _} =
17101711
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
17111712
ok.
17121713

@@ -1823,9 +1824,9 @@ cancelled_checkout_out_test() ->
18231824
?assertEqual(1, maps:size(State2#state.messages)),
18241825
?assertEqual(1, lqueue:len(State2#state.returns)),
18251826

1826-
{State3, {dequeue, {0, {_, first}}}, _} =
1827+
{State3, {dequeue, {0, {_, first}}, _}, _} =
18271828
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
1828-
{_State, {dequeue, {_, {_, second}}}, _} =
1829+
{_State, {dequeue, {_, {_, second}}, _}, _} =
18291830
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
18301831
ok.
18311832

@@ -2141,7 +2142,7 @@ pending_enqueue_is_enqueued_on_down_test() ->
21412142
Pid = self(),
21422143
{State0, _} = enq(1, 2, first, test_init(test)),
21432144
{State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
2144-
{_State2, {dequeue, {0, {_, first}}}, _} =
2145+
{_State2, {dequeue, {0, {_, first}}, 0}, _} =
21452146
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
21462147
ok.
21472148

@@ -2190,7 +2191,7 @@ purge_test() ->
21902191
{State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
21912192
{State3, _} = enq(3, 2, second, State2),
21922193
% get returns a reply value
2193-
{_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} =
2194+
{_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
21942195
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
21952196
ok.
21962197

@@ -2604,11 +2605,11 @@ enq(Idx, MsgSeq, Msg, State) ->
26042605
apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
26052606

26062607
deq(Idx, Cid, Settlement, State0) ->
2607-
{State, {dequeue, Msg}, _} =
2608+
{State, {dequeue, {MsgId, Msg}, _}, _} =
26082609
apply(meta(Idx),
26092610
make_checkout(Cid, {dequeue, Settlement}, #{}),
26102611
State0),
2611-
{State, Msg}.
2612+
{State, {MsgId, Msg}}.
26122613

26132614
check_n(Cid, Idx, N, State) ->
26142615
strip_reply(

src/rabbit_fifo_client.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ enqueue(Msg, State) ->
177177
%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
178178
-spec dequeue(rabbit_fifo:consumer_tag(),
179179
Settlement :: settled | unsettled, state()) ->
180-
{ok, rabbit_fifo:delivery_msg() | empty, state()} | {error | timeout, term()}.
180+
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
181+
| empty, state()} | {error | timeout, term()}.
181182
dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
182183
Node = pick_node(State0),
183184
ConsumerId = consumer_id(ConsumerTag),
@@ -186,8 +187,10 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
186187
{dequeue, Settlement},
187188
#{}),
188189
Timeout) of
189-
{ok, {dequeue, Reply}, Leader} ->
190-
{ok, Reply, State0#state{leader = Leader}};
190+
{ok, {dequeue, empty}, Leader} ->
191+
{ok, empty, State0#state{leader = Leader}};
192+
{ok, {dequeue, Msg, NumReady}, Leader} ->
193+
{ok, {Msg, NumReady}, State0#state{leader = Leader}};
191194
Err ->
192195
Err
193196
end.
@@ -399,12 +402,18 @@ purge(Node) ->
399402
Err
400403
end.
401404

402-
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
403-
| {error | timeout, term()}.
405+
-spec stat(ra_server_id()) ->
406+
{ok, non_neg_integer(), non_neg_integer()}
407+
| {error | timeout, term()}.
404408
stat(Leader) ->
405-
Query = fun (State) -> rabbit_fifo:query_stat(State) end,
406-
{ok, {_, Stat}, _} = ra:local_query(Leader, Query),
407-
Stat.
409+
%% short timeout as we don't want to spend too long if it is going to
410+
%% fail anyway
411+
case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250) of
412+
{ok, {_, {R, C}}, _} ->
413+
{ok, R, C};
414+
Err ->
415+
Err
416+
end.
408417

409418
%% @doc returns the cluster name
410419
-spec cluster_name(state()) -> cluster_name().

src/rabbit_quorum_queue.erl

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,11 @@ stop(VHost) ->
310310
rabbit_types:username()) ->
311311
{ok, QLen :: non_neg_integer()}.
312312
delete(#amqqueue{type = quorum, pid = {Name, _},
313-
name = QName, quorum_nodes = QNodes},
313+
name = QName, quorum_nodes = QNodes} = Q,
314314
_IfUnused, _IfEmpty, ActingUser) ->
315315
%% TODO Quorum queue needs to support consumer tracking for IfUnused
316316
Timeout = ?DELETE_TIMEOUT,
317-
Msgs = quorum_messages(Name),
317+
{ok, ReadyMsgs, _} = stat(Q),
318318
Servers = [{Name, Node} || Node <- QNodes],
319319
case ra:delete_cluster(Servers, Timeout) of
320320
{ok, {_, LeaderNode} = Leader} ->
@@ -328,7 +328,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
328328
ok = delete_queue_data(QName, ActingUser),
329329
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
330330
?TICK_TIME),
331-
{ok, Msgs};
331+
{ok, ReadyMsgs};
332332
{error, {no_more_servers_to_try, Errs}} ->
333333
case lists:all(fun({{error, noproc}, _}) -> true;
334334
(_) -> false
@@ -337,7 +337,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
337337
%% If all ra nodes were already down, the delete
338338
%% has succeed
339339
delete_queue_data(QName, ActingUser),
340-
{ok, Msgs};
340+
{ok, ReadyMsgs};
341341
false ->
342342
%% attempt forced deletion of all servers
343343
rabbit_log:warning(
@@ -347,14 +347,14 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
347347
[rabbit_misc:rs(QName), Errs]),
348348
ok = force_delete_queue(Servers),
349349
delete_queue_data(QName, ActingUser),
350-
{ok, Msgs}
350+
{ok, ReadyMsgs}
351351
end
352352
end.
353353

354354

355355
force_delete_queue(Servers) ->
356356
[begin
357-
case catch(ra:delete_server(S)) of
357+
case catch(ra:force_delete_server(S)) of
358358
ok -> ok;
359359
Err ->
360360
rabbit_log:warning(
@@ -390,9 +390,9 @@ credit(CTag, Credit, Drain, QState) ->
390390

391391
-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
392392
rabbit_fifo_client:state()) ->
393-
{'ok', 'empty', rabbit_fifo_client:state()} |
394-
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
395-
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
393+
{'ok', 'empty', rabbit_fifo_client:state()} |
394+
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
395+
basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck,
396396
CTag0, QState0) ->
397397
CTag = quorum_ctag(CTag0),
398398
Settlement = case NoAck of
@@ -404,11 +404,11 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
404404
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
405405
{ok, empty, QState} ->
406406
{ok, empty, QState};
407-
{ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
407+
{ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
408408
Count = maps:get(delivery_count, MsgHeader, 0),
409409
IsDelivered = Count > 0,
410410
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
411-
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
411+
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
412412
{timeout, _} ->
413413
{error, timeout}
414414
end.
@@ -495,8 +495,12 @@ info(Q, Items) ->
495495

496496
stat(#amqqueue{pid = Leader}) ->
497497
try
498-
{Ready, Consumers} = rabbit_fifo_client:stat(Leader),
499-
{ok, Ready, Consumers}
498+
case rabbit_fifo_client:stat(Leader) of
499+
{ok, _, _} = Stat ->
500+
Stat;
501+
_ ->
502+
{ok, 0, 0}
503+
end
500504
catch
501505
_:_ ->
502506
%% Leader is not available, cluster might be in minority

test/rabbit_fifo_SUITE.erl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ return(Config) ->
145145
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
146146
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
147147
{_, _, F2} = process_ra_events(F1, 100),
148-
{ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
148+
{ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
149149
{ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
150150

151151
ra:stop_server(ServerId),
@@ -237,9 +237,9 @@ resends_lost_command(Config) ->
237237
meck:unload(ra),
238238
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
239239
{_, _, F4} = process_ra_events(F3, 500),
240-
{ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
241-
{ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
242-
{ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
240+
{ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
241+
{ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
242+
{ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
243243
ra:stop_server(ServerId),
244244
ok.
245245

@@ -299,7 +299,7 @@ returns_after_down(Config) ->
299299
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
300300
timer:sleep(1000),
301301
% message should be available for dequeue
302-
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
302+
{ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
303303
ra:stop_server(ServerId),
304304
ok.
305305

@@ -322,9 +322,9 @@ resends_after_lost_applied(Config) ->
322322
% send another message
323323
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
324324
{_, _, F4} = process_ra_events(F3, 500),
325-
{ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
326-
{ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
327-
{ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
325+
{ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
326+
{ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
327+
{ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
328328
ra:stop_server(ServerId),
329329
ok.
330330

@@ -395,7 +395,7 @@ cancel_checkout(Config) ->
395395
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
396396
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
397397
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
398-
{ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
398+
{ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
399399
ok.
400400

401401
credit(Config) ->
@@ -445,7 +445,7 @@ untracked_enqueue(Config) ->
445445
ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
446446
timer:sleep(100),
447447
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
448-
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
448+
{ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
449449
ra:stop_server(ServerId),
450450
ok.
451451

@@ -504,10 +504,10 @@ dequeue(Config) ->
504504
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
505505
{_, _, F2} = process_ra_events(F2_, 100),
506506

507-
{ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
507+
{ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
508508
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
509509
{_, _, F4} = process_ra_events(F4_, 100),
510-
{ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
510+
{ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
511511
{ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
512512
ra:stop_server(ServerId),
513513
ok.
@@ -521,7 +521,7 @@ enq_deq_n(0, F0, Acc) ->
521521
enq_deq_n(N, F, Acc) ->
522522
{ok, F1} = rabbit_fifo_client:enqueue(N, F),
523523
{_, _, F2} = process_ra_events(F1, 10),
524-
{ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
524+
{ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
525525

526526
{_, _, F4} = process_ra_events(F3, 5),
527527
enq_deq_n(N-1, F4, [Deq | Acc]).

0 commit comments

Comments
 (0)