2222-export ([start /2 , stop /1 ]).
2323
2424% % exported for testing only
25- -export ([start_msg_store /3 , stop_msg_store /1 , init / 3 ]).
25+ -export ([start_msg_store /3 , stop_msg_store /1 ]).
2626
2727-include (" mc.hrl" ).
2828-include_lib (" stdlib/include/qlc.hrl" ).
144144 unacked_bytes ,
145145 persistent_count , % % w unacked
146146 persistent_bytes , % % w unacked
147- delta_transient_bytes , % %
148147
149148 ram_msg_count , % % w/o unacked
150149 ram_msg_count_prev ,
204203-record (q_tail ,
205204 { start_seq_id , % % start_seq_id is inclusive
206205 count ,
207- transient ,
208206 end_seq_id % % end_seq_id is exclusive
209207 }).
210208
285283
286284-define (BLANK_Q_TAIL , # q_tail { start_seq_id = undefined ,
287285 count = 0 ,
288- transient = 0 ,
289286 end_seq_id = undefined }).
290287-define (BLANK_Q_TAIL_PATTERN (Z ), # q_tail { start_seq_id = Z ,
291288 count = 0 ,
292- transient = 0 ,
293289 end_seq_id = Z }).
294290
295291-define (MICROS_PER_SECOND , 1000000.0 ).
@@ -698,9 +694,6 @@ info(messages_ram, State) ->
698694 info (messages_ready_ram , State ) + info (messages_unacknowledged_ram , State );
699695info (messages_persistent , # vqstate {persistent_count = PersistentCount }) ->
700696 PersistentCount ;
701- % % @todo Remove.
702- info (messages_paged_out , # vqstate {q_tail = # q_tail {transient = Count }}) ->
703- Count ;
704697info (message_bytes , # vqstate {bytes = Bytes ,
705698 unacked_bytes = UBytes }) ->
706699 Bytes + UBytes ;
@@ -712,9 +705,6 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
712705 RamBytes ;
713706info (message_bytes_persistent , # vqstate {persistent_bytes = PersistentBytes }) ->
714707 PersistentBytes ;
715- % % @todo Remove.
716- info (message_bytes_paged_out , # vqstate {delta_transient_bytes = PagedOutBytes }) ->
717- PagedOutBytes ;
718708info (head_message_timestamp , # vqstate {
719709 q_head = QHead ,
720710 ram_pending_ack = RPA }) ->
@@ -994,28 +984,18 @@ is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
994984 maps :is_key (SeqId , RPA ) orelse
995985 maps :is_key (SeqId , DPA ).
996986
997- expand_q_tail (SeqId , ? BLANK_Q_TAIL_PATTERN (X ), IsPersistent ) ->
998- qt (# q_tail { start_seq_id = SeqId , count = 1 , end_seq_id = SeqId + 1 ,
999- transient = one_if (not IsPersistent )});
987+ expand_q_tail (SeqId , ? BLANK_Q_TAIL_PATTERN (X )) ->
988+ qt (# q_tail { start_seq_id = SeqId , count = 1 , end_seq_id = SeqId + 1 });
1000989expand_q_tail (SeqId , # q_tail { start_seq_id = StartSeqId ,
1001- count = Count ,
1002- transient = Transient } = QTail ,
1003- IsPersistent )
990+ count = Count } = QTail )
1004991 when SeqId < StartSeqId ->
1005- qt (QTail # q_tail { start_seq_id = SeqId , count = Count + 1 ,
1006- transient = Transient + one_if (not IsPersistent )});
992+ qt (QTail # q_tail { start_seq_id = SeqId , count = Count + 1 });
1007993expand_q_tail (SeqId , # q_tail { count = Count ,
1008- end_seq_id = EndSeqId ,
1009- transient = Transient } = QTail ,
1010- IsPersistent )
994+ end_seq_id = EndSeqId } = QTail )
1011995 when SeqId >= EndSeqId ->
1012- qt (QTail # q_tail { count = Count + 1 , end_seq_id = SeqId + 1 ,
1013- transient = Transient + one_if (not IsPersistent )});
1014- expand_q_tail (_SeqId , # q_tail { count = Count ,
1015- transient = Transient } = QTail ,
1016- IsPersistent ) ->
1017- qt (QTail # q_tail { count = Count + 1 ,
1018- transient = Transient + one_if (not IsPersistent ) }).
996+ qt (QTail # q_tail { count = Count + 1 , end_seq_id = SeqId + 1 });
997+ expand_q_tail (_SeqId , # q_tail { count = Count } = QTail ) ->
998+ qt (QTail # q_tail { count = Count + 1 }).
1019999
10201000% %----------------------------------------------------------------------------
10211001% % Internal major helpers for Public API
@@ -1048,7 +1028,6 @@ init(IsDurable, IndexState, StoreState, DiskCount, DiskBytes, Terms,
10481028 true -> ? BLANK_Q_TAIL ;
10491029 false -> qt (# q_tail { start_seq_id = LowSeqId ,
10501030 count = DiskCount1 ,
1051- transient = 0 ,
10521031 end_seq_id = NextSeqId })
10531032 end ,
10541033 Now = erlang :monotonic_time (),
@@ -1073,7 +1052,6 @@ init(IsDurable, IndexState, StoreState, DiskCount, DiskBytes, Terms,
10731052 persistent_count = DiskCount1 ,
10741053 bytes = DiskBytes1 ,
10751054 persistent_bytes = DiskBytes1 ,
1076- delta_transient_bytes = 0 ,
10771055
10781056 ram_msg_count = 0 ,
10791057 ram_msg_count_prev = 0 ,
@@ -1163,7 +1141,7 @@ stats_published_disk(MS = #msg_status{is_persistent = true}, St) ->
11631141 ? UP (bytes , persistent_bytes , + msg_size (MS ))};
11641142stats_published_disk (MS = # msg_status {is_persistent = false }, St ) ->
11651143 St # vqstate {? UP (len , + 1 ),
1166- ? UP (bytes , delta_transient_bytes , + msg_size (MS ))}.
1144+ ? UP (bytes , + msg_size (MS ))}.
11671145
11681146% % Pending acks do not add to len. Messages are kept in memory.
11691147stats_published_pending_acks (MS = # msg_status {is_persistent = true }, St ) ->
@@ -1186,8 +1164,6 @@ stats_pending_acks(MS, St) ->
11861164
11871165% % Message may or may not be persistent and the contents
11881166% % may or may not be in memory.
1189- % %
1190- % % Removal from delta_transient_bytes is done by read_from_q_tail.
11911167stats_removed (MS = # msg_status {is_persistent = true , msg = undefined }, St ) ->
11921168 St # vqstate {? UP (len , persistent_count , - 1 ),
11931169 ? UP (bytes , persistent_bytes , - msg_size (MS ))};
@@ -1237,7 +1213,6 @@ stats_requeued_disk(MS = #msg_status{is_persistent = true}, St) ->
12371213 ? UP (bytes , + msg_size (MS )), ? UP (unacked_bytes , - msg_size (MS ))};
12381214stats_requeued_disk (MS = # msg_status {is_persistent = false }, St ) ->
12391215 St # vqstate {? UP (len , + 1 ),
1240- ? UP (bytes , delta_transient_bytes , + msg_size (MS )),
12411216 ? UP (unacked_bytes , - msg_size (MS ))}.
12421217
12431218msg_size (# msg_status {msg_props = # message_properties {size = Size }}) -> Size .
@@ -1541,7 +1516,7 @@ publish1(Msg,
15411516 stats_published_memory (MsgStatus1 , State2 );
15421517 _ ->
15431518 {MsgStatus1 , State1 } = PersistFun (true , true , MsgStatus , State ),
1544- QTail1 = expand_q_tail (SeqId , QTail , IsPersistent ),
1519+ QTail1 = expand_q_tail (SeqId , QTail ),
15451520 State2 = State1 # vqstate { q_tail = QTail1 },
15461521 stats_published_disk (MsgStatus1 , State2 )
15471522 end ,
@@ -1865,9 +1840,8 @@ q_tail_merge(SeqIds, QTail, MsgIds, State) ->
18651840 case msg_from_pending_ack (SeqId , State0 ) of
18661841 {none , _ } ->
18671842 Acc ;
1868- {# msg_status { msg_id = MsgId ,
1869- is_persistent = IsPersistent } = MsgStatus , State1 } ->
1870- {expand_q_tail (SeqId , QTail0 , IsPersistent ), [MsgId | MsgIds0 ],
1843+ {# msg_status { msg_id = MsgId } = MsgStatus , State1 } ->
1844+ {expand_q_tail (SeqId , QTail0 ), [MsgId | MsgIds0 ],
18711845 stats_requeued_disk (MsgStatus , State1 )}
18721846 end
18731847 end , {QTail , MsgIds , State }, SeqIds ).
@@ -1930,12 +1904,10 @@ read_from_q_tail(DelsAndAcksFun,
19301904 ram_msg_count = RamMsgCount ,
19311905 ram_bytes = RamBytes ,
19321906 disk_read_count = DiskReadCount ,
1933- delta_transient_bytes = DeltaTransientBytes ,
19341907 transient_threshold = TransientThreshold },
19351908 MemoryLimit , WhatToRead ) ->
19361909 # q_tail { start_seq_id = QTailSeqId ,
19371910 count = QTailCount ,
1938- transient = Transient ,
19391911 end_seq_id = QTailSeqIdEnd } = QTail ,
19401912 % % For v2 we want to limit the number of messages read at once to lower
19411913 % % the memory footprint. We use the consume rate to determine how many
@@ -2012,7 +1984,7 @@ read_from_q_tail(DelsAndAcksFun,
20121984 metadata_only ->
20131985 {List0 , StoreState , MCStateP , MCStateT }
20141986 end ,
2015- {QHead1 , RamCountsInc , RamBytesInc , State1 , TransientCount , TransientBytes } =
1987+ {QHead1 , RamCountsInc , RamBytesInc , State1 } =
20161988 become_q_head (List , TransientThreshold ,
20171989 DelsAndAcksFun ,
20181990 State # vqstate { index_state = IndexState1 ,
@@ -2036,17 +2008,13 @@ read_from_q_tail(DelsAndAcksFun,
20362008 0 ->
20372009 % % q_tail is now empty
20382010 State2 # vqstate { q_tail = ? BLANK_Q_TAIL ,
2039- q_head = QHead ,
2040- delta_transient_bytes = 0 };
2011+ q_head = QHead };
20412012 N when N > 0 ->
20422013 QTail1 = qt (# q_tail { start_seq_id = QTailSeqId1 ,
2043- count = N ,
2044- % % @todo Probably something wrong, seen it become negative...
2045- transient = Transient - TransientCount ,
2046- end_seq_id = QTailSeqIdEnd }),
2014+ count = N ,
2015+ end_seq_id = QTailSeqIdEnd }),
20472016 State2 # vqstate { q_head = QHead ,
2048- q_tail = QTail1 ,
2049- delta_transient_bytes = DeltaTransientBytes - TransientBytes }
2017+ q_tail = QTail1 }
20502018 end
20512019 end .
20522020
@@ -2070,30 +2038,27 @@ merge_sh_read_msgs(MTail, _Reads) ->
20702038 MTail .
20712039
20722040become_q_head (List , TransientThreshold , DelsAndAcksFun , State = # vqstate { next_deliver_seq_id = NextDeliverSeqId0 }) ->
2073- {Filtered , NextDeliverSeqId , Acks , RamReadyCount , RamBytes , TransientCount , TransientBytes } =
2041+ {Filtered , NextDeliverSeqId , Acks , RamReadyCount , RamBytes } =
20742042 lists :foldr (
20752043 fun ({_MsgOrId , SeqId , _MsgLocation , _MsgProps , IsPersistent } = M ,
2076- {Filtered1 , NextDeliverSeqId1 , Acks1 , RRC , RB , TC , TB } = Acc ) ->
2044+ {Filtered1 , NextDeliverSeqId1 , Acks1 , RRC , RB } = Acc ) ->
20772045 case SeqId < TransientThreshold andalso not IsPersistent of
20782046 true -> {Filtered1 ,
20792047 next_deliver_seq_id (SeqId , NextDeliverSeqId1 ),
2080- [SeqId | Acks1 ], RRC , RB , TC , TB };
2048+ [SeqId | Acks1 ], RRC , RB };
20812049 false -> MsgStatus = m (msg_status (M )),
20822050 HaveMsg = msg_in_ram (MsgStatus ),
20832051 Size = msg_size (MsgStatus ),
20842052 case is_msg_in_pending_acks (SeqId , State ) of
20852053 false -> {? QUEUE :in_r (MsgStatus , Filtered1 ),
20862054 NextDeliverSeqId1 , Acks1 ,
20872055 RRC + one_if (HaveMsg ),
2088- RB + one_if (HaveMsg ) * Size ,
2089- TC + one_if (not IsPersistent ),
2090- TB + one_if (not IsPersistent ) * Size };
2056+ RB + one_if (HaveMsg ) * Size };
20912057 true -> Acc % % [0]
20922058 end
20932059 end
2094- end , {? QUEUE :new (), NextDeliverSeqId0 , [], 0 , 0 , 0 , 0 }, List ),
2095- {Filtered , RamReadyCount , RamBytes , DelsAndAcksFun (NextDeliverSeqId , Acks , State ),
2096- TransientCount , TransientBytes }.
2060+ end , {? QUEUE :new (), NextDeliverSeqId0 , [], 0 , 0 }, List ),
2061+ {Filtered , RamReadyCount , RamBytes , DelsAndAcksFun (NextDeliverSeqId , Acks , State )}.
20972062% % [0] We don't increase RamBytes here, even though it pertains to
20982063% % unacked messages too, since if HaveMsg then the message must have
20992064% % been stored in the QI, thus the message must have been in
0 commit comments