Skip to content

Commit b9565be

Browse files
authored
Merge pull request kubernetes#104345 from MikeSpreitzer/test-width
Fix extra latency and add tests for that and non-unit width
2 parents 3a26b86 + d2a27a5 commit b9565be

File tree

3 files changed

+298
-67
lines changed

3 files changed

+298
-67
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

Lines changed: 62 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package queueset
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"math"
2324
"sync"
@@ -101,7 +102,8 @@ type queueSet struct {
101102
// queues are still draining.
102103
queues []*queue
103104

104-
// virtualTime is the number of virtual seconds since process startup
105+
// virtualTime is the amount of seat-seconds allocated per queue since process startup.
106+
// This is our generalization of the progress meter named R in the original fair queuing work.
105107
virtualTime float64
106108

107109
// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
@@ -477,13 +479,15 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte
477479
// in addition to their seats.
478480
// Ideally, this should be based on projected completion time in the
479481
// virtual world of the youngest request in the queue.
480-
thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
481-
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
482+
queue := qs.queues[queueIdx]
483+
waitingSeats := queue.requests.SeatsSum()
484+
thisSeatsSum := waitingSeats // + queue.seatsInUse
485+
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse)
482486
if thisSeatsSum < bestQueueSeatsSum {
483487
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
484488
}
485489
})
486-
klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
490+
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
487491
return bestQueueIdx
488492
}
489493

@@ -549,10 +553,10 @@ func (qs *queueSet) enqueueLocked(request *request) {
549553
queue := request.queue
550554
now := qs.clock.Now()
551555
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
552-
// the queue’s virtual start time is set to the virtual time.
556+
// the queue’s start R is set to the virtual time.
553557
queue.virtualStart = qs.virtualTime
554558
if klog.V(6).Enabled() {
555-
klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
559+
klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
556560
}
557561
}
558562
queue.Enqueue(request)
@@ -598,7 +602,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
598602
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
599603
qs.obsPair.RequestsExecuting.Add(1)
600604
if klog.V(5).Enabled() {
601-
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
605+
klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
602606
}
603607
return req
604608
}
@@ -634,9 +638,9 @@ func (qs *queueSet) dispatchLocked() bool {
634638
qs.obsPair.RequestsWaiting.Add(-1)
635639
qs.obsPair.RequestsExecuting.Add(1)
636640
if klog.V(6).Enabled() {
637-
klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
641+
klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
638642
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
639-
queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
643+
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
640644
}
641645
// When a request is dequeued for service -> qs.virtualStart += G
642646
queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
@@ -659,10 +663,6 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
659663
}
660664
// wait for all "currently" executing requests in this queueSet
661665
// to finish before we can execute this request.
662-
if klog.V(4).Enabled() {
663-
klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
664-
qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
665-
}
666666
return false
667667
case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
668668
return false
@@ -692,8 +692,8 @@ func (qs *queueSet) selectQueueLocked() *queue {
692692
estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
693693
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
694694
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
695-
// the virtual finish time of the oldest request is:
696-
// virtual start time + G
695+
// the finish R of the oldest request is:
696+
// start R + G
697697
// we are not taking the width of the request into account when
698698
// we calculate the virtual finish time of the request because
699699
// it can starve requests with smaller wdith in other queues.
@@ -704,12 +704,12 @@ func (qs *queueSet) selectQueueLocked() *queue {
704704
// - we have two queues, q1 and q2
705705
// - q1 has an infinite supply of requests with width W=1
706706
// - q2 has one request waiting in the queue with width W=2
707-
// - virtual start time for both q1 and q2 are at t0
707+
// - start R for both q1 and q2 are at t0
708708
// - requests complete really fast, S=1ms on q1
709709
// in this scenario we will execute roughly 60,000 requests
710710
// from q1 before we pick the request from q2.
711711
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
712-
712+
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
713713
if currentVirtualFinish < minVirtualFinish {
714714
minVirtualFinish = currentVirtualFinish
715715
minQueue = queue
@@ -724,9 +724,18 @@ func (qs *queueSet) selectQueueLocked() *queue {
724724
oldestReqFromMinQueue = r
725725
return false
726726
})
727-
if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
727+
if oldestReqFromMinQueue == nil {
728+
// This cannot happen
729+
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
730+
return nil
731+
}
732+
if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
728733
// since we have not picked the queue with the minimum virtual finish
729734
// time, we are not going to advance the round robin index here.
735+
if klog.V(4).Enabled() {
736+
klog.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
737+
qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.Seats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
738+
}
730739
return nil
731740
}
732741

@@ -743,7 +752,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
743752
// time.
744753
//
745754
// hence we're refreshing the per-queue virtual time for the chosen
746-
// queue here. if the last virtual start time (excluded estimated cost)
755+
// queue here. if the last start R (excluded estimated cost)
747756
// falls behind the global virtual time, we update the latest virtual
748757
// start by: <latest global virtual time> + <previously estimated cost>
749758
previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
@@ -790,47 +799,62 @@ func (qs *queueSet) finishRequestLocked(r *request) {
790799
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
791800
if r.queue != nil {
792801
r.queue.seatsInUse -= r.Seats()
793-
794-
if klog.V(6).Enabled() {
795-
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
796-
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
797-
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
798-
}
799802
}
800803
}
801804

802805
defer func() {
803806
if r.workEstimate.AdditionalLatency <= 0 {
804807
// release the seats allocated to this request immediately
805808
releaseSeatsLocked()
809+
if !klog.V(6).Enabled() {
810+
} else if r.queue != nil {
811+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
812+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
813+
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
814+
} else {
815+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
816+
}
806817
return
807818
}
808819

809820
additionalLatency := r.workEstimate.AdditionalLatency
821+
if !klog.V(6).Enabled() {
822+
} else if r.queue != nil {
823+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
824+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), r.queue.index,
825+
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
826+
} else {
827+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
828+
}
810829
// EventAfterDuration will execute the event func in a new goroutine,
811830
// so the seats allocated to this request will be released after
812831
// AdditionalLatency elapses, this ensures that the additional
813832
// latency has no impact on the user experience.
814833
qs.clock.EventAfterDuration(func(_ time.Time) {
815-
qs.lock.Lock()
834+
qs.lockAndSyncTime()
816835
defer qs.lock.Unlock()
836+
now := qs.clock.Now()
817837
releaseSeatsLocked()
838+
if !klog.V(6).Enabled() {
839+
} else if r.queue != nil {
840+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
841+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, r.queue.index,
842+
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
843+
} else {
844+
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.Seats, qs.totRequestsExecuting, qs.totSeatsInUse)
845+
}
846+
qs.dispatchAsMuchAsPossibleLocked()
818847
}, additionalLatency)
819848
}()
820849

821-
if r.queue == nil {
822-
if klog.V(6).Enabled() {
823-
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
824-
}
825-
return
826-
}
850+
if r.queue != nil {
851+
// request has finished, remove from requests executing
852+
r.queue.requestsExecuting--
827853

828-
// request has finished, remove from requests executing
829-
r.queue.requestsExecuting--
830-
831-
// When a request finishes being served, and the actual service time was S,
832-
// the queue’s virtual start time is decremented by (G - S)*width.
833-
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
854+
// When a request finishes being served, and the actual service time was S,
855+
// the queue’s start R is decremented by (G - S)*width.
856+
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
857+
}
834858
}
835859

836860
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {

0 commit comments

Comments
 (0)