3030 purge /1 ,
3131 update_machine_state /2 ,
3232 pending_size /1 ,
33+ num_cached_segments /1 ,
3334 stat /1 ,
3435 stat /2 ,
3536 query_single_active_consumer /1 ,
4041-define (TIMER_TIME , 10000 ).
4142-define (COMMAND_TIMEOUT , 30000 ).
4243-define (UNLIMITED_PREFETCH_COUNT , 2000 ). % % something large for ra
44+ % % controls the timer for closing cached segments
45+ -define (CACHE_SEG_TIMEOUT , 5000 ).
4346
4447-type seq () :: non_neg_integer ().
48+ -type milliseconds () :: non_neg_integer ().
49+
4550
4651-record (consumer , {key :: rabbit_fifo :consumer_key (),
4752 % status = up :: up | cancelled,
6974 pending = #{} :: #{seq () =>
7075 {term (), rabbit_fifo :command ()}},
7176 consumers = #{} :: #{rabbit_types :ctag () => # consumer {}},
72- timer_state :: term ()
77+ timer_state :: term (),
78+ cached_segments :: undefined |
79+ {undefined | reference (),
80+ LastSeenMs :: milliseconds (),
81+ ra_flru :state ()}
7382 }).
7483
7584-opaque state () :: # state {}.
@@ -132,9 +141,15 @@ enqueue(QName, Correlation, Msg,
132141 % % it is safe to reject the message as we never attempted
133142 % % to send it
134143 {reject_publish , State0 };
144+ {error , {shutdown , delete }} ->
145+ rabbit_log :debug (" ~ts : QQ ~ts tried to register enqueuer during delete shutdown" ,
146+ [? MODULE , rabbit_misc :rs (QName )]),
147+ {reject_publish , State0 };
135148 {timeout , _ } ->
136149 {reject_publish , State0 };
137150 Err ->
151+ rabbit_log :debug (" ~ts : QQ ~ts error when registering enqueuer ~p " ,
152+ [? MODULE , rabbit_misc :rs (QName ), Err ]),
138153 exit (Err )
139154 end ;
140155enqueue (_QName , _Correlation , _Msg ,
@@ -167,7 +182,7 @@ enqueue(QName, Correlation, Msg,
167182% % @param QueueName Name of the queue.
168183% % @param Msg an arbitrary erlang term representing the message.
169184% % @param State the current {@module} state.
170- % % @returns
185+ % % @return's
171186% % `{ok, State, Actions}' if the command was successfully sent.
172187% % {@module} assigns a sequence number to every raft command it issues. The
173188% % SequenceNumber can be correlated to the applied sequence numbers returned
@@ -510,6 +525,15 @@ purge(Server) ->
510525pending_size (# state {pending = Pend }) ->
511526 maps :size (Pend ).
512527
528+ -spec num_cached_segments (state ()) -> non_neg_integer ().
529+ num_cached_segments (# state {cached_segments = CachedSegments }) ->
530+ case CachedSegments of
531+ undefined ->
532+ 0 ;
533+ {_ , _ , Cached } ->
534+ ra_flru :size (Cached )
535+ end .
536+
513537-spec stat (ra :server_id ()) ->
514538 {ok , non_neg_integer (), non_neg_integer ()}
515539 | {error | timeout , term ()}.
@@ -657,7 +681,8 @@ handle_ra_event(QName, Leader, {applied, Seqs},
657681 _ ->
658682 {ok , State2 , Actions }
659683 end ;
660- handle_ra_event (QName , From , {machine , {delivery , _ConsumerTag , _ } = Del }, State0 ) ->
684+ handle_ra_event (QName , From , {machine , Del }, State0 )
685+ when element (1 , Del ) == delivery ->
661686 handle_delivery (QName , From , Del , State0 );
662687handle_ra_event (_QName , _From , {machine , Action }, State )
663688 when element (1 , Action ) =:= credit_reply orelse
@@ -667,28 +692,31 @@ handle_ra_event(_QName, _, {machine, {queue_status, Status}},
667692 # state {} = State ) ->
668693 % % just set the queue status
669694 {ok , State # state {queue_status = Status }, []};
670- handle_ra_event (_QName , Leader , {machine , leader_change },
695+ handle_ra_event (QName , Leader , {machine , leader_change },
671696 # state {leader = OldLeader ,
672697 pending = Pending } = State0 ) ->
673698 % % we need to update leader
674699 % % and resend any pending commands
675- rabbit_log :debug (" ~ts : Detected QQ leader change from ~w to ~w , "
700+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change from ~w to ~w , "
676701 " resending ~b pending commands" ,
677- [? MODULE , OldLeader , Leader , maps :size (Pending )]),
702+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
703+ Leader , maps :size (Pending )]),
678704 State = resend_all_pending (State0 # state {leader = Leader }),
679705 {ok , State , []};
680706handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
681707 # state {leader = Leader } = State ) ->
682708 {ok , State , []};
683- handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
709+ handle_ra_event (QName , _From , {rejected , {not_leader , Leader , _Seq }},
684710 # state {leader = OldLeader ,
685711 pending = Pending } = State0 ) ->
686- rabbit_log :debug (" ~ts : Detected QQ leader change (rejection) from ~w to ~w , "
712+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change (rejection) from ~w to ~w , "
687713 " resending ~b pending commands" ,
688- [? MODULE , OldLeader , Leader , maps :size (Pending )]),
714+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
715+ Leader , maps :size (Pending )]),
689716 State = resend_all_pending (State0 # state {leader = Leader }),
690717 {ok , cancel_timer (State ), []};
691- handle_ra_event (_QName , _From , {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
718+ handle_ra_event (_QName , _From ,
719+ {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
692720 % TODO: how should these be handled? re-sent on timer or try random
693721 {ok , State0 , []};
694722handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers }} = State0 ) ->
@@ -700,6 +728,30 @@ handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State
700728 State = resend_all_pending (State0 # state {leader = Leader }),
701729 {ok , State , []}
702730 end ;
731+ handle_ra_event (QName , Leader , close_cached_segments ,
732+ # state {cached_segments = CachedSegments } = State ) ->
733+ {ok ,
734+ case CachedSegments of
735+ undefined ->
736+ % % timer didn't get cancelled so just ignore this
737+ State ;
738+ {_TRef , Last , Cache } ->
739+ case now_ms () > Last + ? CACHE_SEG_TIMEOUT of
740+ true ->
741+ rabbit_log :debug (" ~ts : closing_cached_segments" ,
742+ [rabbit_misc :rs (QName )]),
743+ % % its been long enough, evict all
744+ _ = ra_flru :evict_all (Cache ),
745+ State # state {cached_segments = undefined };
746+ false ->
747+ % % set another timer
748+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
749+ {'$gen_cast' ,
750+ {queue_event , QName ,
751+ {Leader , close_cached_segments }}}),
752+ State # state {cached_segments = {Ref , Last , Cache }}
753+ end
754+ end , []};
703755handle_ra_event (_QName , _Leader , {machine , eol }, State ) ->
704756 {eol , [{unblock , cluster_name (State )}]}.
705757
@@ -863,7 +915,39 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
863915 % % we should return all messages.
864916 MsgIntIds = [Id || {Id , _ } <- IdMsgs ],
865917 {State1 , Deliveries } = return (Tag , MsgIntIds , State0 ),
866- {ok , State1 , Deliveries }.
918+ {ok , State1 , Deliveries };
919+ handle_delivery (QName , Leader , {delivery , Tag , ReadPlan , Msgs },
920+ # state {cached_segments = CachedSegments } = State ) ->
921+ {TRef , Cached0 } = case CachedSegments of
922+ undefined ->
923+ {undefined , undefined };
924+ {R , _ , C } ->
925+ {R , C }
926+ end ,
927+ {MsgIds , Cached1 } = rabbit_fifo :exec_read (Cached0 , ReadPlan , Msgs ),
928+ % % if there are cached segments after a read and there
929+ % % is no current timer set, set a timer
930+ % % send a message to evict cache after some time
931+ Cached = case ra_flru :size (Cached1 ) > 0 of
932+ true when TRef == undefined ->
933+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
934+ {'$gen_cast' ,
935+ {queue_event , QName ,
936+ {Leader , close_cached_segments }}}),
937+ {Ref , now_ms (), Cached1 };
938+ true ->
939+ {TRef , now_ms (), Cached1 };
940+ false when is_reference (TRef ) ->
941+ % % the time is (potentially) alive and may as well be
942+ % % cancelled here
943+ _ = erlang :cancel_timer (TRef , [{async , true },
944+ {info , false }]),
945+ undefined ;
946+ false ->
947+ undefined
948+ end ,
949+ handle_delivery (QName , Leader , {delivery , Tag , MsgIds },
950+ State # state {cached_segments = Cached }).
867951
868952transform_msgs (QName , QRef , Msgs ) ->
869953 lists :map (
@@ -1032,3 +1116,6 @@ send_pending(Cid, #state{unsent_commands = Unsent} = State0) ->
10321116 normal , S0 )
10331117 end , State0 , Commands ),
10341118 State1 # state {unsent_commands = maps :remove (Cid , Unsent )}.
1119+
1120+ now_ms () ->
1121+ erlang :system_time (millisecond ).
0 commit comments