|
195 | 195 | suspected_down = false :: boolean() |
196 | 196 | }). |
197 | 197 |
|
198 | | - |
199 | 198 | -record(state, |
200 | 199 | {name :: atom(), |
201 | 200 | queue_resource :: rabbit_types:r('queue'), |
@@ -435,17 +434,22 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, |
435 | 434 | checkout(Meta, State1, [{monitor, process, Pid}]); |
436 | 435 | apply(#{index := RaftIdx}, #purge{}, |
437 | 436 | #state{ra_indexes = Indexes0, |
| 437 | + returns = Returns, |
438 | 438 | messages = Messages} = State0) -> |
439 | | - Total = maps:size(Messages), |
440 | | - Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, |
| 439 | + Total = messages_ready(State0), |
| 440 | + Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, |
441 | 441 | Indexes0, |
442 | 442 | [I || {I, _} <- lists:sort(maps:values(Messages))]), |
| 443 | + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, |
| 444 | + Indexes1, |
| 445 | + [I || {_, {I, _}} <- lqueue:to_list(Returns)]), |
443 | 446 | {State, _, Effects} = |
444 | 447 | update_smallest_raft_index(RaftIdx, Indexes0, |
445 | 448 | State0#state{ra_indexes = Indexes, |
446 | 449 | messages = #{}, |
447 | 450 | returns = lqueue:new(), |
448 | 451 | msg_bytes_enqueue = 0, |
| 452 | + prefix_msgs = {[], []}, |
449 | 453 | low_msg_num = undefined}, |
450 | 454 | []), |
451 | 455 | %% as we're not checking out after a purge (no point) we have to |
@@ -646,16 +650,14 @@ tick(_Ts, #state{name = Name, |
646 | 650 | -spec overview(state()) -> map(). |
647 | 651 | overview(#state{consumers = Cons, |
648 | 652 | enqueuers = Enqs, |
649 | | - messages = Messages, |
650 | | - ra_indexes = Indexes, |
651 | 653 | msg_bytes_enqueue = EnqueueBytes, |
652 | 654 | msg_bytes_checkout = CheckoutBytes} = State) -> |
653 | 655 | #{type => ?MODULE, |
654 | 656 | num_consumers => maps:size(Cons), |
655 | 657 | num_checked_out => num_checked_out(State), |
656 | 658 | num_enqueuers => maps:size(Enqs), |
657 | | - num_ready_messages => maps:size(Messages), |
658 | | - num_messages => rabbit_fifo_index:size(Indexes), |
| 659 | + num_ready_messages => messages_ready(State), |
| 660 | + num_messages => messages_total(State), |
659 | 661 | enqueue_message_bytes => EnqueueBytes, |
660 | 662 | checkout_message_bytes => CheckoutBytes}. |
661 | 663 |
|
|
0 commit comments