Skip to content

Commit d2a27a5

Browse files
committed
Fix extra latency and add tests for that and width
Added missing dispatching after delayed release of seats. Updated logging for all six situations of execution completion and seat release. Added behavioral tests for non-zero extra latency and non-unit width. Also added two tests for baseline functionality. Also improved some comments and other logging in `queueset.go`.
1 parent 3c72622 commit d2a27a5

File tree

3 files changed

+298
-67
lines changed

3 files changed

+298
-67
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package queueset
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"math"
2324
"sync"
@@ -101,7 +102,8 @@ type queueSet struct {
101102
// queues are still draining.
102103
queues []*queue
103104

104-
// virtualTime is the number of virtual seconds since process startup
105+
// virtualTime is the amount of seat-seconds allocated per queue since process startup.
106+
// This is our generalization of the progress meter named R in the original fair queuing work.
105107
virtualTime float64
106108

107109
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
@@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
477479
// in addition to their seats.
478480
// Ideally, this should be based on projected completion time in the
479481
// virtual world of the youngest request in the queue.
480-
thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
481-
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
482+
queue := qs.queues[queueIdx]
483+
waitingSeats := queue.requests.SeatsSum()
484+
thisSeatsSum := waitingSeats // + queue.seatsInUse
485+
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse)
482486
if thisSeatsSum < bestQueueSeatsSum {
483487
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
484488
}
485489
})
486-
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
490+
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
487491
return bestQueueIdx
488492
}
489493

@@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) {
549553
queue := request.queue
550554
now := qs.clock.Now()
551555
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
552-
// the queue’s virtual start time is set to the virtual time.
556+
// the queue’s start R is set to the virtual time.
553557
queue.virtualStart = qs.virtualTime
554558
if klog.V(6).Enabled() {
555-
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
559+
klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
556560
}
557561
}
558562
queue.Enqueue(request)
@@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
598602
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
599603
qs.obsPair.RequestsExecuting.Add(1)
600604
if klog.V(5).Enabled() {
601-
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
605+
klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
602606
}
603607
return req
604608
}
@@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool {
634638
qs.obsPair.RequestsWaiting.Add(-1)
635639
qs.obsPair.RequestsExecuting.Add(1)
636640
if klog.V(6).Enabled() {
637-
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
641+
klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
638642
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
639-
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
643+
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
640644
}
641645
// When a request is dequeued for service -> qs.virtualStart += G
642646
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
@@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
659663
}
660664
// wait for all "currently" executing requests in this queueSet
661665
// to finish before we can execute this request.
662-
if klog.V(4).Enabled() {
663-
klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
664-
qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
665-
}
666666
return false
667667
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
668668
return false
@@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue {
692692
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
693693
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
694694
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
695-
// the virtual finish time of the oldest request is:
696-
// virtual start time + G
695+
// the finish R of the oldest request is:
696+
// start R + G
697697
// we are not taking the width of the request into account when
698698
// we calculate the virtual finish time of the request because
699699
// it can starve requests with smaller wdith in other queues.
@@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue {
704704
// - we have two queues, q1 and q2
705705
// - q1 has an infinite supply of requests with width W=1
706706
// - q2 has one request waiting in the queue with width W=2
707-
// - virtual start time for both q1 and q2 are at t0
707+
// - start R for both q1 and q2 are at t0
708708
// - requests complete really fast, S=1ms on q1
709709
// in this scenario we will execute roughly 60,000 requests
710710
// from q1 before we pick the request from q2.
711711
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
712-
712+
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
713713
if currentVirtualFinish < minVirtualFinish {
714714
minVirtualFinish = currentVirtualFinish
715715
minQueue = queue
@@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue {
724724
oldestReqFromMinQueue = r
725725
return false
726726
})
727-
if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
727+
if oldestReqFromMinQueue == nil {
728+
// This cannot happen
729+
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
730+
return nil
731+
}
732+
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
728733
// since we have not picked the queue with the minimum virtual finish
729734
// time, we are not going to advance the round robin index here.
735+
if klog.V(4).Enabled() {
736+
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
737+
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
738+
}
730739
return nil
731740
}
732741

@@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
743752
// time.
744753
//
745754
// hence we're refreshing the per-queue virtual time for the chosen
746-
// queue here. if the last virtual start time (excluded estimated cost)
755+
// queue here. if the last start R (excluded estimated cost)
747756
// falls behind the global virtual time, we update the latest virtual
748757
// start by: <latest global virtual time> + <previously estimated cost>
749758
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
@@ -790,47 +799,62 @@ func (qs *queueSet) finishRequestLocked(r *request) {
790799
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
791800
if r.queue != nil {
792801
r.queue.seatsInUse -= r.Seats()
793-
794-
if klog.V(6).Enabled() {
795-
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
796-
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
797-
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
798-
}
799802
}
800803
}
801804

802805
defer func() {
803806
if r.workEstimate.AdditionalLatency <= 0 {
804807
// release the seats allocated to this request immediately
805808
releaseSeatsLocked()
809+
if !klog.V(6).Enabled() {
810+
} else if r.queue != nil {
811+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
812+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
813+
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
814+
} else {
815+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
816+
}
806817
return
807818
}
808819

809820
additionalLatency := r.workEstimate.AdditionalLatency
821+
if !klog.V(6).Enabled() {
822+
} else if r.queue != nil {
823+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
824+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index,
825+
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
826+
} else {
827+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
828+
}
810829
// EventAfterDuration will execute the event func in a new goroutine,
811830
// so the seats allocated to this request will be released after
812831
// AdditionalLatency elapses, this ensures that the additional
813832
// latency has no impact on the user experience.
814833
qs.clock.EventAfterDuration(func(_ time.Time) {
815-
qs.lock.Lock()
834+
qs.lockAndSyncTime()
816835
defer qs.lock.Unlock()
836+
now := qs.clock.Now()
817837
releaseSeatsLocked()
838+
if !klog.V(6).Enabled() {
839+
} else if r.queue != nil {
840+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
841+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
842+
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
843+
} else {
844+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
845+
}
846+
qs.dispatchAsMuchAsPossibleLocked()
818847
}, additionalLatency)
819848
}()
820849

821-
if r.queue == nil {
822-
if klog.V(6).Enabled() {
823-
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
824-
}
825-
return
826-
}
850+
if r.queue != nil {
851+
// request has finished, remove from requests executing
852+
r.queue.requestsExecuting--
827853

828-
// request has finished, remove from requests executing
829-
r.queue.requestsExecuting--
830-
831-
// When a request finishes being served, and the actual service time was S,
832-
// the queue’s virtual start time is decremented by (G - S)*width.
833-
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
854+
// When a request finishes being served, and the actual service time was S,
855+
// the queue’s start R is decremented by (G - S)*width.
856+
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
857+
}
834858
}
835859

836860
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {

0 commit comments

Comments
 (0)