@@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg,
160160 # state {queue_status = reject_publish ,
161161 cfg = # cfg {}} = State ) ->
162162 {reject_publish , State };
163- enqueue (QName , Correlation , Msg ,
163+ enqueue (_QName , Correlation , Msg ,
164164 # state {slow = WasSlow ,
165165 pending = Pending ,
166166 queue_status = go ,
@@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg,
176176 next_seq = Seq + 1 ,
177177 next_enqueue_seq = EnqueueSeq + 1 ,
178178 slow = IsSlow },
179+
179180 if IsSlow andalso not WasSlow ->
180- {ok , set_timer ( QName , State ) , [{block , cluster_name (State )}]};
181+ {ok , State , [{block , cluster_name (State )}]};
181182 true ->
182183 {ok , State , []}
183184 end .
@@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs},
632633 when ActualLeader =/= OldLeader ->
633634 % % there is a new leader
634635 ? LOG_DEBUG (" ~ts : Detected QQ leader change (applied) "
635- " from ~w to ~w , "
636- " resending ~b pending commands" ,
637- [? MODULE , OldLeader , ActualLeader ,
638- maps :size (State1 # state .pending )]),
636+ " from ~w to ~w , "
637+ " resending ~b pending commands" ,
638+ [? MODULE , OldLeader , ActualLeader ,
639+ maps :size (State1 # state .pending )]),
639640 resend_all_pending (State1 # state {leader = ActualLeader });
640641 _ ->
641642 State1
@@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change},
702703 % % we need to update leader
703704 % % and resend any pending commands
704705 ? LOG_DEBUG (" ~ts : ~s Detected QQ leader change from ~w to ~w , "
705- " resending ~b pending commands" ,
706- [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
707- Leader , maps :size (Pending )]),
706+ " resending ~b pending commands" ,
707+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
708+ Leader , maps :size (Pending )]),
708709 State = resend_all_pending (State0 # state {leader = Leader }),
709710 {ok , State , []};
710711handle_ra_event (_QName , _From , {rejected , {not_leader , Leader , _Seq }},
@@ -714,9 +715,9 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}},
714715 # state {leader = OldLeader ,
715716 pending = Pending } = State0 ) ->
716717 ? LOG_DEBUG (" ~ts : ~s Detected QQ leader change (rejection) from ~w to ~w , "
717- " resending ~b pending commands" ,
718- [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
719- Leader , maps :size (Pending )]),
718+ " resending ~b pending commands" ,
719+ [rabbit_misc :rs (QName ), ? MODULE , OldLeader ,
720+ Leader , maps :size (Pending )]),
720721 State = resend_all_pending (State0 # state {leader = Leader }),
721722 {ok , cancel_timer (State ), []};
722723handle_ra_event (_QName , _From ,
@@ -728,8 +729,6 @@ handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers},
728729 pending = Pending } = State0 ) ->
729730 case find_leader (Servers ) of
730731 undefined ->
731- ? LOG_DEBUG (" ~ts : ~s Pending applied Timeout, no leader found, num pend ~b " ,
732- [rabbit_misc :rs (QName ), ? MODULE , maps :size (Pending )]),
733732 % % still no leader, set the timer again
734733 {ok , set_timer (QName , State0 ), []};
735734 Leader ->
@@ -751,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments,
751750 case now_ms () > Last + ? CACHE_SEG_TIMEOUT of
752751 true ->
753752 ? LOG_DEBUG (" ~ts : closing_cached_segments" ,
754- [rabbit_misc :rs (QName )]),
753+ [rabbit_misc :rs (QName )]),
755754 % % its been long enough, evict all
756755 _ = ra_flru :evict_all (Cache ),
757756 State # state {cached_segments = undefined };
@@ -812,12 +811,16 @@ seq_applied({Seq, Response},
812811 {Corrs , Actions0 , # state {} = State0 }) ->
813812 % % sequences aren't guaranteed to be applied in order as enqueues are
814813 % % low priority commands and may be overtaken by others with a normal priority.
814+ % %
815+ % % if the response is 'not_enqueued' we need to still keep the pending
816+ % % command for a later resend
815817 {Actions , State } = maybe_add_action (Response , Actions0 , State0 ),
816818 case maps :take (Seq , State # state .pending ) of
817- {{undefined , _ }, Pending } ->
819+ {{undefined , _ }, Pending }
820+ when Response =/= not_enqueued ->
818821 {Corrs , Actions , State # state {pending = Pending }};
819822 {{Corr , _ }, Pending }
820- when Response /= not_enqueued ->
823+ when Response = /= not_enqueued ->
821824 {[Corr | Corrs ], Actions , State # state {pending = Pending }};
822825 _ ->
823826 {Corrs , Actions , State }
0 commit comments