139139 transient_threshold ,
140140 qi_embed_msgs_below ,
141141
142- len , % % w/o unacked @todo No longer needed, is q_head+q_tail.
143142 bytes , % % w/o unacked
144143 unacked_bytes ,
145144 persistent_count , % % w unacked
256255 transient_threshold :: non_neg_integer (),
257256 qi_embed_msgs_below :: non_neg_integer (),
258257
259- len :: non_neg_integer (),
260258 bytes :: non_neg_integer (),
261259 unacked_bytes :: non_neg_integer (),
262-
263260 persistent_count :: non_neg_integer (),
264261 persistent_bytes :: non_neg_integer (),
265262
@@ -455,12 +452,12 @@ delete_crashed(Q) when ?is_amqqueue(Q) ->
455452 QName = amqqueue :get_name (Q ),
456453 ok = rabbit_classic_queue_index_v2 :erase (QName ).
457454
458- purge (State = # vqstate { len = Len } ) ->
455+ purge (State ) ->
459456 case is_pending_ack_empty (State ) and is_unconfirmed_empty (State ) of
460457 true ->
461- {Len , purge_and_index_reset (State )};
458+ {len ( State ) , purge_and_index_reset (State )};
462459 false ->
463- {Len , purge_when_pending_acks (State )}
460+ {len ( State ) , purge_when_pending_acks (State )}
464461 end .
465462
466463purge_acks (State ) -> a (purge_pending_ack (false , State )).
@@ -566,12 +563,7 @@ ack(AckTags, State) ->
566563
567564requeue (AckTags , # vqstate { q_head = QHead0 ,
568565 q_tail = QTail ,
569- in_counter = InCounter ,
570- len = Len } = State ) ->
571- % % @todo This can be heavily simplified: if the message falls into q_tail,
572- % % add it there. Otherwise just add it to q_head in the correct position.
573- % % @todo I think if the message falls within q_head we must add it back there,
574- % % otherwise there's nothing to do? Except update stats.
566+ in_counter = InCounter } = State ) ->
575567 {SeqIds , QHead , MsgIds , State1 } = requeue_merge (lists :sort (AckTags ), QHead0 , [],
576568 q_tail_limit (QTail ), State ),
577569 {QTail1 , MsgIds1 , State2 } = q_tail_merge (SeqIds , QTail , MsgIds , State1 ),
@@ -580,8 +572,7 @@ requeue(AckTags, #vqstate { q_head = QHead0,
580572 maybe_update_rates (
581573 State2 # vqstate { q_head = QHead ,
582574 q_tail = QTail1 ,
583- in_counter = InCounter + MsgCount ,
584- len = Len + MsgCount }))}.
575+ in_counter = InCounter + MsgCount }))}.
585576
586577ackfold (MsgFun , Acc , State , AckTags ) ->
587578 {AccN , StateN } =
@@ -592,7 +583,8 @@ ackfold(MsgFun, Acc, State, AckTags) ->
592583 end , {Acc , State }, AckTags ),
593584 {AccN , a (StateN )}.
594585
595- len (# vqstate { len = Len }) -> Len .
586+ len (# vqstate { q_head = QHead , q_tail = # q_tail { count = QTailCount }}) ->
587+ ? QUEUE :len (QHead ) + QTailCount .
596588
597589is_empty (State ) -> 0 == len (State ).
598590
@@ -717,10 +709,9 @@ info(disk_reads, #vqstate{disk_read_count = Count}) ->
717709 Count ;
718710info (disk_writes , # vqstate {disk_write_count = Count }) ->
719711 Count ;
720- info (backing_queue_status , # vqstate {
712+ info (backing_queue_status , State = # vqstate {
721713 q_head = QHead ,
722714 q_tail = QTail ,
723- len = Len ,
724715 next_seq_id = NextSeqId ,
725716 next_deliver_seq_id = NextDeliverSeqId ,
726717 ram_pending_ack = RPA ,
@@ -736,7 +727,7 @@ info(backing_queue_status, #vqstate {
736727 [ {version , 2 },
737728 {q_head , ? QUEUE :len (QHead )},
738729 {q_tail , QTail },
739- {len , Len },
730+ {len , len ( State ) },
740731 {next_seq_id , NextSeqId },
741732 {next_deliver_seq_id , NextDeliverSeqId },
742733 {num_pending_acks , map_size (RPA ) + map_size (DPA )},
@@ -828,33 +819,19 @@ get_pa_head(PA) ->
828819 map_get (Smallest , PA )
829820 end .
830821
831- a (State = # vqstate { q_head = QHead ,
832- q_tail = QTail ,
833- len = Len ,
834- bytes = Bytes ,
822+ a (State = # vqstate { bytes = Bytes ,
835823 unacked_bytes = UnackedBytes ,
836824 persistent_count = PersistentCount ,
837825 persistent_bytes = PersistentBytes ,
838826 ram_msg_count = RamMsgCount ,
839827 ram_bytes = RamBytes }) ->
840- ED = QTail # q_tail .count == 0 ,
841- E3 = ? QUEUE :is_empty (QHead ),
842- LZ = Len == 0 ,
843- L3 = ? QUEUE :len (QHead ),
844-
845- % % If the queue is empty, then q_head and q_tail are both empty.
846- true = LZ == (ED and E3 ),
847-
848- % % All messages are in q_head or q_tail.
849- true = QTail # q_tail .count + L3 == Len ,
850828
851- true = Len >= 0 ,
852829 true = Bytes >= 0 ,
853830 true = UnackedBytes >= 0 ,
854831 true = PersistentCount >= 0 ,
855832 true = PersistentBytes >= 0 ,
856833 true = RamMsgCount >= 0 ,
857- true = RamMsgCount =< Len ,
834+ % % Requeues may lead to RamMsgCount > 2048.
858835 true = RamBytes >= 0 ,
859836 true = RamBytes =< Bytes + UnackedBytes ,
860837
@@ -1048,7 +1025,6 @@ init(IsDurable, IndexState, StoreState, DiskCount, DiskBytes, Terms,
10481025 transient_threshold = NextSeqId ,
10491026 qi_embed_msgs_below = IndexMaxSize ,
10501027
1051- len = DiskCount1 ,
10521028 persistent_count = DiskCount1 ,
10531029 bytes = DiskBytes1 ,
10541030 persistent_bytes = DiskBytes1 ,
@@ -1129,21 +1105,20 @@ read_msg(_, MsgId, IsPersistent, rabbit_msg_store, State = #vqstate{msg_store_cl
11291105%% When publishing to memory, transient messages do not get written to disk.
11301106%% On the other hand, persistent messages are kept in memory as well as disk.
11311107stats_published_memory(MS = #msg_status{is_persistent = true }, St ) ->
1132- St # vqstate {? UP (len , ram_msg_count , persistent_count , + 1 ),
1108+ St # vqstate {? UP (ram_msg_count , persistent_count , + 1 ),
11331109 ? UP (bytes , ram_bytes , persistent_bytes , + msg_size (MS ))};
11341110stats_published_memory (MS = # msg_status {is_persistent = false }, St ) ->
1135- St # vqstate {? UP (len , ram_msg_count , + 1 ),
1111+ St # vqstate {? UP (ram_msg_count , + 1 ),
11361112 ? UP (bytes , ram_bytes , + msg_size (MS ))}.
11371113
11381114% % Messages published directly to disk are not kept in memory.
11391115stats_published_disk (MS = # msg_status {is_persistent = true }, St ) ->
1140- St # vqstate {? UP (len , persistent_count , + 1 ),
1116+ St # vqstate {? UP (persistent_count , + 1 ),
11411117 ? UP (bytes , persistent_bytes , + msg_size (MS ))};
11421118stats_published_disk (MS = # msg_status {is_persistent = false }, St ) ->
1143- St # vqstate {? UP (len , + 1 ),
1144- ? UP (bytes , + msg_size (MS ))}.
1119+ St # vqstate {? UP (bytes , + msg_size (MS ))}.
11451120
1146- % % Pending acks do not add to len. Messages are kept in memory.
1121+ % % Pending acks messages are kept in memory.
11471122stats_published_pending_acks (MS = # msg_status {is_persistent = true }, St ) ->
11481123 St # vqstate {? UP (persistent_count , + 1 ),
11491124 ? UP (persistent_bytes , unacked_bytes , ram_bytes , + msg_size (MS ))};
@@ -1156,24 +1131,23 @@ stats_published_pending_acks(MS = #msg_status{is_persistent = false}, St) ->
11561131% % was fully on disk the content will not be read immediately).
11571132% % The contents stay where they are during this operation.
11581133stats_pending_acks (MS = # msg_status {msg = undefined }, St ) ->
1159- St # vqstate {? UP (len , - 1 ),
1160- ? UP (bytes , - msg_size (MS )), ? UP (unacked_bytes , + msg_size (MS ))};
1134+ St # vqstate {? UP (bytes , - msg_size (MS )), ? UP (unacked_bytes , + msg_size (MS ))};
11611135stats_pending_acks (MS , St ) ->
1162- St # vqstate {? UP (len , ram_msg_count , - 1 ),
1136+ St # vqstate {? UP (ram_msg_count , - 1 ),
11631137 ? UP (bytes , - msg_size (MS )), ? UP (unacked_bytes , + msg_size (MS ))}.
11641138
11651139% % Message may or may not be persistent and the contents
11661140% % may or may not be in memory.
11671141stats_removed (MS = # msg_status {is_persistent = true , msg = undefined }, St ) ->
1168- St # vqstate {? UP (len , persistent_count , - 1 ),
1142+ St # vqstate {? UP (persistent_count , - 1 ),
11691143 ? UP (bytes , persistent_bytes , - msg_size (MS ))};
11701144stats_removed (MS = # msg_status {is_persistent = true }, St ) ->
1171- St # vqstate {? UP (len , ram_msg_count , persistent_count , - 1 ),
1145+ St # vqstate {? UP (ram_msg_count , persistent_count , - 1 ),
11721146 ? UP (bytes , ram_bytes , persistent_bytes , - msg_size (MS ))};
11731147stats_removed (MS = # msg_status {is_persistent = false , msg = undefined }, St ) ->
1174- St # vqstate {? UP (len , - 1 ), ? UP ( bytes , - msg_size (MS ))};
1148+ St # vqstate {? UP (bytes , - msg_size (MS ))};
11751149stats_removed (MS = # msg_status {is_persistent = false }, St ) ->
1176- St # vqstate {? UP (len , ram_msg_count , - 1 ),
1150+ St # vqstate {? UP (ram_msg_count , - 1 ),
11771151 ? UP (bytes , ram_bytes , - msg_size (MS ))}.
11781152
11791153% % @todo Very confusing that ram_msg_count is without unacked but ram_bytes is with.
@@ -1194,26 +1168,15 @@ stats_acked_pending(MS = #msg_status{is_persistent = false}, St) ->
11941168
11951169% % Notice that this is the reverse of stats_pending_acks.
11961170stats_requeued_memory (MS = # msg_status {msg = undefined }, St ) ->
1197- St # vqstate {? UP (len , + 1 ),
1198- ? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
1171+ St # vqstate {? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
11991172stats_requeued_memory (MS , St ) ->
1200- St # vqstate {? UP (len , ram_msg_count , + 1 ),
1173+ St # vqstate {? UP (ram_msg_count , + 1 ),
12011174 ? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))}.
12021175
1203- % % TODO!!!
1204- % % @todo For v2 since we don't remove from disk until we ack, we don't need
1205- % % to write to disk again on requeue. If the message falls within q_tail
1206- % % we can just drop the MsgStatus. Otherwise we just put it in q_head and
1207- % % we don't do any disk writes.
1208- % %
1209- % % So we don't need to change anything except how we count stats as
1210- % % well as q_tail stats if the message falls within q_tail.
12111176stats_requeued_disk (MS = # msg_status {is_persistent = true }, St ) ->
1212- St # vqstate {? UP (len , + 1 ),
1213- ? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
1177+ St # vqstate {? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
12141178stats_requeued_disk (MS = # msg_status {is_persistent = false }, St ) ->
1215- St # vqstate {? UP (len , + 1 ),
1216- ? UP (unacked_bytes , - msg_size (MS ))}.
1179+ St # vqstate {? UP (unacked_bytes , - msg_size (MS ))}.
12171180
12181181msg_size (# msg_status {msg_props = # message_properties {size = Size }}) -> Size .
12191182
@@ -1491,7 +1454,6 @@ publish1(Msg,
14911454 IsDelivered , _ChPid , PersistFun ,
14921455 State = # vqstate { q_head = QHead ,
14931456 q_tail = QTail = # q_tail { count = QTailCount },
1494- len = Len ,
14951457 qi_embed_msgs_below = IndexMaxSize ,
14961458 next_seq_id = SeqId ,
14971459 next_deliver_seq_id = NextDeliverSeqId ,
@@ -1508,9 +1470,9 @@ publish1(Msg,
15081470 % % limit is at 1 because the queue process will need to access this message to know
15091471 % % expiration information.
15101472 MemoryLimit = min (1 + floor (2 * OutRate ), 2048 ),
1473+ QHeadLen = ? QUEUE :len (QHead ),
15111474 State3 = case QTailCount of
1512- % % Len is the same as QHead length when QTailCount =:= 0.
1513- 0 when Len < MemoryLimit ->
1475+ 0 when QHeadLen < MemoryLimit ->
15141476 {MsgStatus1 , State1 } = PersistFun (false , false , MsgStatus , State ),
15151477 State2 = State1 # vqstate { q_head = ? QUEUE :in (m (MsgStatus1 ), QHead ) },
15161478 stats_published_memory (MsgStatus1 , State2 );
@@ -1791,8 +1753,6 @@ msgs_written_to_disk(Callback, MsgIdSet, written) ->
17911753 % % for all message IDs. This is a waste. We should only
17921754 % % call it for messages that need confirming, and avoid
17931755 % % this intersection call.
1794- % %
1795- % % The same may apply to msg_indices_written_to_disk as well.
17961756 Confirmed = sets :intersection (UC , MsgIdSet ),
17971757 record_confirms (sets :intersection (MsgIdSet , MIOD ),
17981758 State # vqstate {
0 commit comments