@@ -775,32 +775,67 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
775
775
func (qs * queueSet ) finishRequestLocked (r * request ) {
776
776
now := qs .clock .Now ()
777
777
qs .totRequestsExecuting --
778
- qs .totSeatsInUse -= r .Seats ()
779
778
metrics .AddRequestsExecuting (r .ctx , qs .qCfg .Name , r .fsName , - 1 )
780
- metrics .AddRequestConcurrencyInUse (qs .qCfg .Name , r .fsName , - r .Seats ())
781
779
qs .obsPair .RequestsExecuting .Add (- 1 )
782
780
781
+ S := now .Sub (r .startTime ).Seconds ()
782
+
783
+ // TODO: for now we keep the logic localized so it is easier to see
784
+ // how the counters are tracked for queueset and queue, in future we
785
+ // can refactor to move this function.
786
+ releaseSeatsLocked := func () {
787
+ defer qs .removeQueueIfEmptyLocked (r )
788
+
789
+ qs .totSeatsInUse -= r .Seats ()
790
+ metrics .AddRequestConcurrencyInUse (qs .qCfg .Name , r .fsName , - r .Seats ())
791
+ if r .queue != nil {
792
+ r .queue .seatsInUse -= r .Seats ()
793
+
794
+ if klog .V (6 ).Enabled () {
795
+ klog .Infof ("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing" ,
796
+ qs .qCfg .Name , now .Format (nsTimeFmt ), qs .virtualTime , r .descr1 , r .descr2 , r .queue .index ,
797
+ r .queue .virtualStart , S , r .queue .requests .Length (), r .queue .requestsExecuting )
798
+ }
799
+ }
800
+ }
801
+
802
+ defer func () {
803
+ if r .workEstimate .AdditionalLatency <= 0 {
804
+ // release the seats allocated to this request immediately
805
+ releaseSeatsLocked ()
806
+ return
807
+ }
808
+
809
+ additionalLatency := r .workEstimate .AdditionalLatency
810
+ // EventAfterDuration will execute the event func in a new goroutine,
811
+ // so the seats allocated to this request will be released after
812
+ // AdditionalLatency elapses, this ensures that the additional
813
+ // latency has no impact on the user experience.
814
+ qs .clock .EventAfterDuration (func (_ time.Time ) {
815
+ qs .lock .Lock ()
816
+ defer qs .lock .Unlock ()
817
+ releaseSeatsLocked ()
818
+ }, additionalLatency )
819
+ }()
820
+
783
821
if r .queue == nil {
784
822
if klog .V (6 ).Enabled () {
785
823
klog .Infof ("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing" , qs .qCfg .Name , now .Format (nsTimeFmt ), qs .virtualTime , r .descr1 , r .descr2 , qs .totRequestsExecuting )
786
824
}
787
825
return
788
826
}
789
827
790
- S := now .Sub (r .startTime ).Seconds ()
828
+ // request has finished, remove from requests executing
829
+ r .queue .requestsExecuting --
791
830
792
831
// When a request finishes being served, and the actual service time was S,
793
832
// the queue’s virtual start time is decremented by (G - S)*width.
794
833
r .queue .virtualStart -= (qs .estimatedServiceTime - S ) * float64 (r .Seats ())
834
+ }
795
835
796
- // request has finished, remove from requests executing
797
- r .queue .requestsExecuting --
798
- r .queue .seatsInUse -= r .Seats ()
799
-
800
- if klog .V (6 ).Enabled () {
801
- klog .Infof ("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing" ,
802
- qs .qCfg .Name , now .Format (nsTimeFmt ), qs .virtualTime , r .descr1 , r .descr2 , r .queue .index ,
803
- r .queue .virtualStart , S , r .queue .requests .Length (), r .queue .requestsExecuting )
836
+ func (qs * queueSet ) removeQueueIfEmptyLocked (r * request ) {
837
+ if r .queue == nil {
838
+ return
804
839
}
805
840
806
841
// If there are more queues than desired and this one has no
0 commit comments