3030 purge /1 ,
3131 update_machine_state /2 ,
3232 pending_size /1 ,
33+ num_cached_segments /1 ,
3334 stat /1 ,
3435 stat /2 ,
3536 query_single_active_consumer /1 ,
4041-define (TIMER_TIME , 10000 ).
4142-define (COMMAND_TIMEOUT , 30000 ).
4243-define (UNLIMITED_PREFETCH_COUNT , 2000 ). % % something large for ra
44+ % % controls the timer for closing cached segments
45+ -define (CACHE_SEG_TIMEOUT , 5000 ).
4346
4447-type seq () :: non_neg_integer ().
48+ -type milliseconds () :: non_neg_integer ().
49+
4550
4651-record (consumer , {key :: rabbit_fifo :consumer_key (),
4752 % status = up :: up | cancelled,
7075 {term (), rabbit_fifo :command ()}},
7176 consumers = #{} :: #{rabbit_types :ctag () => # consumer {}},
7277 timer_state :: term (),
73- cached_segments :: undefined | ra_flru :state ()
78+ cached_segments :: undefined |
79+ {undefined | reference (),
80+ LastSeenMs :: milliseconds (),
81+ ra_flr :state ()}
7482 }).
7583
7684-opaque state () :: # state {}.
@@ -517,6 +525,15 @@ purge(Server) ->
517525pending_size (# state {pending = Pend }) ->
518526 maps :size (Pend ).
519527
528+ -spec num_cached_segments (state ()) -> non_neg_integer ().
529+ num_cached_segments (# state {cached_segments = CachedSegments }) ->
530+ case CachedSegments of
531+ undefined ->
532+ 0 ;
533+ {_ , _ , Cached } ->
534+ ra_flru :size (Cached )
535+ end .
536+
520537-spec stat (ra :server_id ()) ->
521538 {ok , non_neg_integer (), non_neg_integer ()}
522539 | {error | timeout , term ()}.
@@ -651,24 +668,25 @@ handle_ra_event(_QName, _, {machine, {queue_status, Status}},
651668 # state {} = State ) ->
652669 % % just set the queue status
653670 {ok , State # state {queue_status = Status }, []};
654- handle_ra_event (_QName , Leader , {machine , leader_change },
671+ handle_ra_event (QName , Leader , {machine , leader_change },
655672 # state {leader = OldLeader } = State0 ) ->
656673 % % we need to update leader
657674 % % and resend any pending commands
658- rabbit_log :debug (" ~ts : Detected QQ leader change from ~w to ~w " ,
659- [? MODULE , OldLeader , Leader ]),
675+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change from ~w to ~w " ,
676+ [rabbit_misc : rs ( QName ), ? MODULE , OldLeader , Leader ]),
660677 State = resend_all_pending (State0 # state {leader = Leader }),
661678 {ok , State , []};
662679handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
663680 # state {leader = Leader } = State ) ->
664681 {ok , State , []};
665- handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
682+ handle_ra_event (QName , _From , {rejected , {not_leader , Leader , _Seq }},
666683 # state {leader = OldLeader } = State0 ) ->
667- rabbit_log :debug (" ~ts : Detected QQ leader change (rejection) from ~w to ~w " ,
668- [? MODULE , OldLeader , Leader ]),
684+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change (rejection) from ~w to ~w " ,
685+ [rabbit_misc : rs ( QName ), ? MODULE , OldLeader , Leader ]),
669686 State = resend_all_pending (State0 # state {leader = Leader }),
670687 {ok , cancel_timer (State ), []};
671- handle_ra_event (_QName , _From , {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
688+ handle_ra_event (_QName , _From ,
689+ {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
672690 % TODO: how should these be handled? re-sent on timer or try random
673691 {ok , State0 , []};
674692handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers }} = State0 ) ->
@@ -680,6 +698,30 @@ handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State
680698 State = resend_all_pending (State0 # state {leader = Leader }),
681699 {ok , State , []}
682700 end ;
701+ handle_ra_event (QName , Leader , close_cached_segments ,
702+ # state {cached_segments = CachedSegments } = State ) ->
703+ {ok ,
704+ case CachedSegments of
705+ undefined ->
706+ % % timer didn't get cancelled so just ignore this
707+ State ;
708+ {_TRef , Last , Cache } ->
709+ case now_ms () > Last + ? CACHE_SEG_TIMEOUT of
710+ true ->
711+ rabbit_log :debug (" ~ts : closing_cached_segments" ,
712+ [rabbit_misc :rs (QName )]),
713+ % % its been long enough, evict all
714+ _ = ra_flru :evict_all (Cache ),
715+ State # state {cached_segments = undefined };
716+ false ->
717+ % % set another timer
718+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
719+ {'$gen_cast' ,
720+ {queue_event , QName ,
721+ {Leader , close_cached_segments }}}),
722+ State # state {cached_segments = {Ref , Last , Cache }}
723+ end
724+ end , []};
683725handle_ra_event (_QName , _Leader , {machine , eol }, State ) ->
684726 {eol , [{unblock , cluster_name (State )}]}.
685727
@@ -845,8 +887,34 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
845887 {State1 , Deliveries } = return (Tag , MsgIntIds , State0 ),
846888 {ok , State1 , Deliveries };
847889handle_delivery (QName , Leader , {delivery , Tag , ReadPlan , Msgs },
848- # state {cached_segments = Cached0 } = State ) ->
849- {MsgIds , Cached } = rabbit_fifo :exec_read (Cached0 , ReadPlan , Msgs ),
890+ # state {cached_segments = CachedSegments } = State ) ->
891+ {TRef , Cached0 } = case CachedSegments of
892+ undefined ->
893+ {undefined , undefined };
894+ {R , _ , C } ->
895+ {R , C }
896+ end ,
897+ {MsgIds , Cached1 } = rabbit_fifo :exec_read (Cached0 , ReadPlan , Msgs ),
898+ % % if there are cached segments after a read and there
899+ % % is no current timer set, set a timer
900+ % % send a message to evict cache after some time
901+ Cached = case ra_flru :size (Cached1 ) > 0 of
902+ true when TRef == undefined ->
903+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
904+ {'$gen_cast' ,
905+ {queue_event , QName ,
906+ {Leader , close_cached_segments }}}),
907+ {Ref , now_ms (), Cached1 };
908+ true ->
909+ {TRef , now_ms (), Cached1 };
910+ false ->
911+ if TRef =/= undefined ->
912+ erlang :cancel_timer (TRef , [{async , true }]);
913+ true ->
914+ ok
915+ end ,
916+ undefined
917+ end ,
850918 handle_delivery (QName , Leader , {delivery , Tag , MsgIds },
851919 State # state {cached_segments = Cached }).
852920
@@ -1017,3 +1085,6 @@ send_pending(Cid, #state{unsent_commands = Unsent} = State0) ->
10171085 normal , S0 )
10181086 end , State0 , Commands ),
10191087 State1 # state {unsent_commands = maps :remove (Cid , Unsent )}.
1088+
1089+ now_ms () ->
1090+ erlang :system_time (millisecond ).
0 commit comments