3030 purge /1 ,
3131 update_machine_state /2 ,
3232 pending_size /1 ,
33+ num_cached_segments /1 ,
3334 stat /1 ,
3435 stat /2 ,
3536 query_single_active_consumer /1 ,
4041-define (TIMER_TIME , 10000 ).
4142-define (COMMAND_TIMEOUT , 30000 ).
4243-define (UNLIMITED_PREFETCH_COUNT , 2000 ). % % something large for ra
44+ % % controls the timer for closing cached segments
45+ -define (CACHE_SEG_TIMEOUT , 5000 ).
4346
4447-type seq () :: non_neg_integer ().
48+ -type milliseconds () :: non_neg_integer ().
49+
4550
4651-record (consumer , {key :: rabbit_fifo :consumer_key (),
4752 % status = up :: up | cancelled,
6974 pending = #{} :: #{seq () =>
7075 {term (), rabbit_fifo :command ()}},
7176 consumers = #{} :: #{rabbit_types :ctag () => # consumer {}},
72- timer_state :: term ()
77+ timer_state :: term (),
78+ cached_segments :: undefined |
79+ {undefined | reference (),
80+ LastSeenMs :: milliseconds (),
81+ ra_flru :state ()}
7382 }).
7483
7584-opaque state () :: # state {}.
@@ -132,9 +141,15 @@ enqueue(QName, Correlation, Msg,
132141 % % it is safe to reject the message as we never attempted
133142 % % to send it
134143 {reject_publish , State0 };
144+ {error , {shutdown , delete }} ->
145+ rabbit_log :debug (" ~ts : QQ ~ts tried to register enqueuer during delete shutdown" ,
146+ [? MODULE , rabbit_misc :rs (QName )]),
147+ {reject_publish , State0 };
135148 {timeout , _ } ->
136149 {reject_publish , State0 };
137150 Err ->
151+ rabbit_log :debug (" ~ts : QQ ~ts error when registering enqueuer ~p " ,
152+ [? MODULE , rabbit_misc :rs (QName ), Err ]),
138153 exit (Err )
139154 end ;
140155enqueue (_QName , _Correlation , _Msg ,
@@ -167,7 +182,7 @@ enqueue(QName, Correlation, Msg,
167182% % @param QueueName Name of the queue.
168183% % @param Msg an arbitrary erlang term representing the message.
169184% % @param State the current {@module} state.
170- % % @returns
185+ % % @return's
171186% % `{ok, State, Actions}' if the command was successfully sent.
172187% % {@module} assigns a sequence number to every raft command it issues. The
173188% % SequenceNumber can be correlated to the applied sequence numbers returned
@@ -510,6 +525,15 @@ purge(Server) ->
510525pending_size (# state {pending = Pend }) ->
511526 maps :size (Pend ).
512527
528+ -spec num_cached_segments (state ()) -> non_neg_integer ().
529+ num_cached_segments (# state {cached_segments = CachedSegments }) ->
530+ case CachedSegments of
531+ undefined ->
532+ 0 ;
533+ {_ , _ , Cached } ->
534+ ra_flru :size (Cached )
535+ end .
536+
513537-spec stat (ra :server_id ()) ->
514538 {ok , non_neg_integer (), non_neg_integer ()}
515539 | {error | timeout , term ()}.
@@ -633,7 +657,8 @@ handle_ra_event(QName, From, {applied, Seqs},
633657 _ ->
634658 {ok , State1 , Actions }
635659 end ;
636- handle_ra_event (QName , From , {machine , {delivery , _ConsumerTag , _ } = Del }, State0 ) ->
660+ handle_ra_event (QName , From , {machine , Del }, State0 )
661+ when element (1 , Del ) == delivery ->
637662 handle_delivery (QName , From , Del , State0 );
638663handle_ra_event (_QName , _From , {machine , Action }, State )
639664 when element (1 , Action ) =:= credit_reply orelse
@@ -643,28 +668,31 @@ handle_ra_event(_QName, _, {machine, {queue_status, Status}},
643668 # state {} = State ) ->
644669 % % just set the queue status
645670 {ok , State # state {queue_status = Status }, []};
646- handle_ra_event (_QName , Leader , {machine , leader_change },
671+ handle_ra_event (QName , Leader , {machine , leader_change },
647672 # state {leader = OldLeader ,
648673 pending = Pending } = State0 ) ->
649674 % % we need to update leader
650675 % % and resend any pending commands
651- rabbit_log :debug (" ~ts : Detected QQ leader change from ~w to ~w , "
676+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change from ~w to ~w , "
652677 " resending ~b pending commands" ,
653- [? MODULE , OldLeader , Leader , maps :size (Pending )]),
678+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
679+ Leader , maps :size (Pending )]),
654680 State = resend_all_pending (State0 # state {leader = Leader }),
655681 {ok , State , []};
656682handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
657683 # state {leader = Leader } = State ) ->
658684 {ok , State , []};
659- handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
685+ handle_ra_event (QName , _From , {rejected , {not_leader , Leader , _Seq }},
660686 # state {leader = OldLeader ,
661687 pending = Pending } = State0 ) ->
662- rabbit_log :debug (" ~ts : Detected QQ leader change (rejection) from ~w to ~w , "
688+ rabbit_log :debug (" ~ts : ~s Detected QQ leader change (rejection) from ~w to ~w , "
663689 " resending ~b pending commands" ,
664- [? MODULE , OldLeader , Leader , maps :size (Pending )]),
690+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
691+ Leader , maps :size (Pending )]),
665692 State = resend_all_pending (State0 # state {leader = Leader }),
666693 {ok , cancel_timer (State ), []};
667- handle_ra_event (_QName , _From , {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
694+ handle_ra_event (_QName , _From ,
695+ {rejected , {not_leader , _UndefinedMaybe , _Seq }}, State0 ) ->
668696 % TODO: how should these be handled? re-sent on timer or try random
669697 {ok , State0 , []};
670698handle_ra_event (QName , _ , timeout , # state {cfg = # cfg {servers = Servers }} = State0 ) ->
@@ -676,6 +704,30 @@ handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State
676704 State = resend_all_pending (State0 # state {leader = Leader }),
677705 {ok , State , []}
678706 end ;
707+ handle_ra_event (QName , Leader , close_cached_segments ,
708+ # state {cached_segments = CachedSegments } = State ) ->
709+ {ok ,
710+ case CachedSegments of
711+ undefined ->
712+ % % timer didn't get cancelled so just ignore this
713+ State ;
714+ {_TRef , Last , Cache } ->
715+ case now_ms () > Last + ? CACHE_SEG_TIMEOUT of
716+ true ->
717+ rabbit_log :debug (" ~ts : closing_cached_segments" ,
718+ [rabbit_misc :rs (QName )]),
719+ % % its been long enough, evict all
720+ _ = ra_flru :evict_all (Cache ),
721+ State # state {cached_segments = undefined };
722+ false ->
723+ % % set another timer
724+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
725+ {'$gen_cast' ,
726+ {queue_event , QName ,
727+ {Leader , close_cached_segments }}}),
728+ State # state {cached_segments = {Ref , Last , Cache }}
729+ end
730+ end , []};
679731handle_ra_event (_QName , _Leader , {machine , eol }, State ) ->
680732 {eol , [{unblock , cluster_name (State )}]}.
681733
@@ -839,7 +891,39 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
839891 % % we should return all messages.
840892 MsgIntIds = [Id || {Id , _ } <- IdMsgs ],
841893 {State1 , Deliveries } = return (Tag , MsgIntIds , State0 ),
842- {ok , State1 , Deliveries }.
894+ {ok , State1 , Deliveries };
895+ handle_delivery (QName , Leader , {delivery , Tag , ReadPlan , Msgs },
896+ # state {cached_segments = CachedSegments } = State ) ->
897+ {TRef , Cached0 } = case CachedSegments of
898+ undefined ->
899+ {undefined , undefined };
900+ {R , _ , C } ->
901+ {R , C }
902+ end ,
903+ {MsgIds , Cached1 } = rabbit_fifo :exec_read (Cached0 , ReadPlan , Msgs ),
904+ % % if there are cached segments after a read and there
905+ % % is no current timer set, set a timer
906+ % % send a message to evict cache after some time
907+ Cached = case ra_flru :size (Cached1 ) > 0 of
908+ true when TRef == undefined ->
909+ Ref = erlang :send_after (? CACHE_SEG_TIMEOUT , self (),
910+ {'$gen_cast' ,
911+ {queue_event , QName ,
912+ {Leader , close_cached_segments }}}),
913+ {Ref , now_ms (), Cached1 };
914+ true ->
915+ {TRef , now_ms (), Cached1 };
916+ false ->
917+ if TRef =/= undefined ->
918+ _ = erlang :cancel_timer (TRef , [{async , true }]),
919+ ok ;
920+ true ->
921+ ok
922+ end ,
923+ undefined
924+ end ,
925+ handle_delivery (QName , Leader , {delivery , Tag , MsgIds },
926+ State # state {cached_segments = Cached }).
843927
844928transform_msgs (QName , QRef , Msgs ) ->
845929 lists :map (
@@ -1008,3 +1092,6 @@ send_pending(Cid, #state{unsent_commands = Unsent} = State0) ->
10081092 normal , S0 )
10091093 end , State0 , Commands ),
10101094 State1 # state {unsent_commands = maps :remove (Cid , Unsent )}.
1095+
1096+ now_ms () ->
1097+ erlang :system_time (millisecond ).
0 commit comments