Skip to content

Commit 649b8c9

Browse files
committed
Fix basic get message ready count
Ensure the messages is ready is returned from basic get. Also fix message count when using basic.delete. [#162502929]
1 parent a4b6025 commit 649b8c9

File tree

4 files changed

+65
-51
lines changed

4 files changed

+65
-51
lines changed

src/rabbit_fifo.erl

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -404,21 +404,22 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
404404
_ when Exists ->
405405
%% a dequeue using the same consumer_id isn't possible at this point
406406
{State0, {dequeue, empty}};
407-
_ ->
407+
Ready ->
408408
State1 = update_consumer(ConsumerId, ConsumerMeta,
409409
{once, 1, simple_prefetch}, State0),
410410
{success, _, MsgId, Msg, State2} = checkout_one(State1),
411411
case Settlement of
412412
unsettled ->
413413
{_, Pid} = ConsumerId,
414-
{State2, {dequeue, {MsgId, Msg}},
414+
{State2, {dequeue, {MsgId, Msg}, Ready-1},
415415
[{monitor, process, Pid}]};
416416
settled ->
417417
%% immediately settle the checkout
418418
{State, _, Effects} = apply(Meta,
419-
make_settle(ConsumerId, [MsgId]),
419+
make_settle(ConsumerId,
420+
[MsgId]),
420421
State2),
421-
{State, {dequeue, {MsgId, Msg}}, Effects}
422+
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
422423
end
423424
end;
424425
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
@@ -761,9 +762,8 @@ query_single_active_consumer(#state{consumer_strategy = single_active,
761762
query_single_active_consumer(_) ->
762763
disabled.
763764

764-
query_stat(#state{messages = M,
765-
consumers = Consumers}) ->
766-
{maps:size(M), maps:size(Consumers)}.
765+
query_stat(#state{consumers = Consumers} = State) ->
766+
{messages_ready(State), maps:size(Consumers)}.
767767

768768
-spec usage(atom()) -> float().
769769
usage(Name) when is_atom(Name) ->
@@ -1632,7 +1632,8 @@ enq_enq_deq_test() ->
16321632
{State1, _} = enq(1, 1, first, test_init(test)),
16331633
{State2, _} = enq(2, 2, second, State1),
16341634
% get returns a reply value
1635-
{_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
1635+
NumReady = 1,
1636+
{_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
16361637
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
16371638
State2),
16381639
ok.
@@ -1642,7 +1643,7 @@ enq_enq_deq_deq_settle_test() ->
16421643
{State1, _} = enq(1, 1, first, test_init(test)),
16431644
{State2, _} = enq(2, 2, second, State1),
16441645
% get returns a reply value
1645-
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
1646+
{State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
16461647
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
16471648
State2),
16481649
{_State4, {dequeue, empty}} =
@@ -1654,7 +1655,7 @@ enq_enq_checkout_get_settled_test() ->
16541655
Cid = {?FUNCTION_NAME, self()},
16551656
{State1, _} = enq(1, 1, first, test_init(test)),
16561657
% get returns a reply value
1657-
{_State2, {dequeue, {0, {_, first}}}, _Effs} =
1658+
{_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
16581659
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
16591660
State1),
16601661
ok.
@@ -1672,7 +1673,7 @@ untracked_enq_deq_test() ->
16721673
{State1, _, _} = apply(meta(1),
16731674
make_enqueue(undefined, undefined, first),
16741675
State0),
1675-
{_State2, {dequeue, {0, {_, first}}}, _} =
1676+
{_State2, {dequeue, {0, {_, first}}, _}, _} =
16761677
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
16771678
ok.
16781679

@@ -1789,9 +1790,9 @@ cancelled_checkout_out_test() ->
17891790
?assertEqual(1, maps:size(State2#state.messages)),
17901791
?assertEqual(1, lqueue:len(State2#state.returns)),
17911792

1792-
{State3, {dequeue, {0, {_, first}}}, _} =
1793+
{State3, {dequeue, {0, {_, first}}, _}, _} =
17931794
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
1794-
{_State, {dequeue, {_, {_, second}}}, _} =
1795+
{_State, {dequeue, {_, {_, second}}, _}, _} =
17951796
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
17961797
ok.
17971798

@@ -2107,7 +2108,7 @@ pending_enqueue_is_enqueued_on_down_test() ->
21072108
Pid = self(),
21082109
{State0, _} = enq(1, 2, first, test_init(test)),
21092110
{State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
2110-
{_State2, {dequeue, {0, {_, first}}}, _} =
2111+
{_State2, {dequeue, {0, {_, first}}, 0}, _} =
21112112
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
21122113
ok.
21132114

@@ -2156,7 +2157,7 @@ purge_test() ->
21562157
{State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
21572158
{State3, _} = enq(3, 2, second, State2),
21582159
% get returns a reply value
2159-
{_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} =
2160+
{_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
21602161
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
21612162
ok.
21622163

@@ -2458,11 +2459,11 @@ enq(Idx, MsgSeq, Msg, State) ->
24582459
apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
24592460

24602461
deq(Idx, Cid, Settlement, State0) ->
2461-
{State, {dequeue, Msg}, _} =
2462+
{State, {dequeue, {MsgId, Msg}, _}, _} =
24622463
apply(meta(Idx),
24632464
make_checkout(Cid, {dequeue, Settlement}, #{}),
24642465
State0),
2465-
{State, Msg}.
2466+
{State, {MsgId, Msg}}.
24662467

24672468
check_n(Cid, Idx, N, State) ->
24682469
strip_reply(

src/rabbit_fifo_client.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ enqueue(Msg, State) ->
177177
%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
178178
-spec dequeue(rabbit_fifo:consumer_tag(),
179179
Settlement :: settled | unsettled, state()) ->
180-
{ok, rabbit_fifo:delivery_msg() | empty, state()} | {error | timeout, term()}.
180+
{ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
181+
| empty, state()} | {error | timeout, term()}.
181182
dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
182183
Node = pick_node(State0),
183184
ConsumerId = consumer_id(ConsumerTag),
@@ -186,8 +187,10 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
186187
{dequeue, Settlement},
187188
#{}),
188189
Timeout) of
189-
{ok, {dequeue, Reply}, Leader} ->
190-
{ok, Reply, State0#state{leader = Leader}};
190+
{ok, {dequeue, empty}, Leader} ->
191+
{ok, empty, State0#state{leader = Leader}};
192+
{ok, {dequeue, Msg, NumReady}, Leader} ->
193+
{ok, {Msg, NumReady}, State0#state{leader = Leader}};
191194
Err ->
192195
Err
193196
end.
@@ -399,12 +402,18 @@ purge(Node) ->
399402
Err
400403
end.
401404

402-
-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
403-
| {error | timeout, term()}.
405+
-spec stat(ra_server_id()) ->
406+
{ok, non_neg_integer(), non_neg_integer()}
407+
| {error | timeout, term()}.
404408
stat(Leader) ->
405-
Query = fun (State) -> rabbit_fifo:query_stat(State) end,
406-
{ok, {_, Stat}, _} = ra:local_query(Leader, Query),
407-
Stat.
409+
%% short timeout as we don't want to spend too long if it is going to
410+
%% fail anyway
411+
case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250) of
412+
{ok, {_, {R, C}}, _} ->
413+
{ok, R, C};
414+
Err ->
415+
Err
416+
end.
408417

409418
%% @doc returns the cluster name
410419
-spec cluster_name(state()) -> cluster_name().

src/rabbit_quorum_queue.erl

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -310,11 +310,11 @@ stop(VHost) ->
310310
rabbit_types:username()) ->
311311
{ok, QLen :: non_neg_integer()}.
312312
delete(#amqqueue{type = quorum, pid = {Name, _},
313-
name = QName, quorum_nodes = QNodes},
313+
name = QName, quorum_nodes = QNodes} = Q,
314314
_IfUnused, _IfEmpty, ActingUser) ->
315315
%% TODO Quorum queue needs to support consumer tracking for IfUnused
316316
Timeout = ?DELETE_TIMEOUT,
317-
Msgs = quorum_messages(Name),
317+
{ok, ReadyMsgs, _} = stat(Q),
318318
Servers = [{Name, Node} || Node <- QNodes],
319319
case ra:delete_cluster(Servers, Timeout) of
320320
{ok, {_, LeaderNode} = Leader} ->
@@ -328,7 +328,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
328328
ok = delete_queue_data(QName, ActingUser),
329329
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
330330
?TICK_TIME),
331-
{ok, Msgs};
331+
{ok, ReadyMsgs};
332332
{error, {no_more_servers_to_try, Errs}} ->
333333
case lists:all(fun({{error, noproc}, _}) -> true;
334334
(_) -> false
@@ -337,7 +337,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
337337
%% If all ra nodes were already down, the delete
338338
%% has succeed
339339
delete_queue_data(QName, ActingUser),
340-
{ok, Msgs};
340+
{ok, ReadyMsgs};
341341
false ->
342342
%% attempt forced deletion of all servers
343343
rabbit_log:warning(
@@ -347,14 +347,14 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
347347
[rabbit_misc:rs(QName), Errs]),
348348
ok = force_delete_queue(Servers),
349349
delete_queue_data(QName, ActingUser),
350-
{ok, Msgs}
350+
{ok, ReadyMsgs}
351351
end
352352
end.
353353

354354

355355
force_delete_queue(Servers) ->
356356
[begin
357-
case catch(ra:delete_server(S)) of
357+
case catch(ra:force_delete_server(S)) of
358358
ok -> ok;
359359
Err ->
360360
rabbit_log:warning(
@@ -390,9 +390,9 @@ credit(CTag, Credit, Drain, QState) ->
390390

391391
-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
392392
rabbit_fifo_client:state()) ->
393-
{'ok', 'empty', rabbit_fifo_client:state()} |
394-
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
395-
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
393+
{'ok', 'empty', rabbit_fifo_client:state()} |
394+
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
395+
basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck,
396396
CTag0, QState0) ->
397397
CTag = quorum_ctag(CTag0),
398398
Settlement = case NoAck of
@@ -404,11 +404,11 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
404404
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
405405
{ok, empty, QState} ->
406406
{ok, empty, QState};
407-
{ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
407+
{ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
408408
Count = maps:get(delivery_count, MsgHeader, 0),
409409
IsDelivered = Count > 0,
410410
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
411-
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
411+
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
412412
{timeout, _} ->
413413
{error, timeout}
414414
end.
@@ -490,8 +490,12 @@ info(Q, Items) ->
490490

491491
stat(#amqqueue{pid = Leader}) ->
492492
try
493-
{Ready, Consumers} = rabbit_fifo_client:stat(Leader),
494-
{ok, Ready, Consumers}
493+
case rabbit_fifo_client:stat(Leader) of
494+
{ok, _, _} = Stat ->
495+
Stat;
496+
_ ->
497+
{ok, 0, 0}
498+
end
495499
catch
496500
_:_ ->
497501
%% Leader is not available, cluster might be in minority

test/rabbit_fifo_SUITE.erl

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ return(Config) ->
145145
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
146146
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
147147
{_, _, F2} = process_ra_events(F1, 100),
148-
{ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
148+
{ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
149149
{ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
150150

151151
ra:stop_server(ServerId),
@@ -237,9 +237,9 @@ resends_lost_command(Config) ->
237237
meck:unload(ra),
238238
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
239239
{_, _, F4} = process_ra_events(F3, 500),
240-
{ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
241-
{ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
242-
{ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
240+
{ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
241+
{ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
242+
{ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
243243
ra:stop_server(ServerId),
244244
ok.
245245

@@ -299,7 +299,7 @@ returns_after_down(Config) ->
299299
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
300300
timer:sleep(1000),
301301
% message should be available for dequeue
302-
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
302+
{ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
303303
ra:stop_server(ServerId),
304304
ok.
305305

@@ -322,9 +322,9 @@ resends_after_lost_applied(Config) ->
322322
% send another message
323323
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
324324
{_, _, F4} = process_ra_events(F3, 500),
325-
{ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
326-
{ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
327-
{ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
325+
{ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
326+
{ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
327+
{ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
328328
ra:stop_server(ServerId),
329329
ok.
330330

@@ -395,7 +395,7 @@ cancel_checkout(Config) ->
395395
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
396396
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
397397
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
398-
{ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
398+
{ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
399399
ok.
400400

401401
credit(Config) ->
@@ -445,7 +445,7 @@ untracked_enqueue(Config) ->
445445
ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
446446
timer:sleep(100),
447447
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
448-
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
448+
{ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
449449
ra:stop_server(ServerId),
450450
ok.
451451

@@ -504,10 +504,10 @@ dequeue(Config) ->
504504
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
505505
{_, _, F2} = process_ra_events(F2_, 100),
506506

507-
{ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
507+
{ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
508508
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
509509
{_, _, F4} = process_ra_events(F4_, 100),
510-
{ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
510+
{ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
511511
{ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
512512
ra:stop_server(ServerId),
513513
ok.
@@ -521,7 +521,7 @@ enq_deq_n(0, F0, Acc) ->
521521
enq_deq_n(N, F, Acc) ->
522522
{ok, F1} = rabbit_fifo_client:enqueue(N, F),
523523
{_, _, F2} = process_ra_events(F1, 10),
524-
{ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
524+
{ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
525525

526526
{_, _, F4} = process_ra_events(F3, 5),
527527
enq_deq_n(N-1, F4, [Deq | Acc]).

0 commit comments

Comments
 (0)