Skip to content

Commit d681864

Browse files
committed
apf: free seats in use after additional latency
1 parent 2d599e3 commit d681864

File tree

3 files changed

+126
-11
lines changed

3 files changed

+126
-11
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -775,32 +775,67 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
775775
func (qs *queueSet) finishRequestLocked(r *request) {
776776
now := qs.clock.Now()
777777
qs.totRequestsExecuting--
778-
qs.totSeatsInUse -= r.Seats()
779778
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
780-
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
781779
qs.obsPair.RequestsExecuting.Add(-1)
782780

781+
S := now.Sub(r.startTime).Seconds()
782+
783+
// TODO: for now we keep the logic localized so it is easier to see
784+
// how the counters are tracked for queueset and queue, in future we
785+
// can refactor to move this function.
786+
releaseSeatsLocked := func() {
787+
defer qs.removeQueueIfEmptyLocked(r)
788+
789+
qs.totSeatsInUse -= r.Seats()
790+
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
791+
if r.queue != nil {
792+
r.queue.seatsInUse -= r.Seats()
793+
794+
if klog.V(6).Enabled() {
795+
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
796+
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
797+
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
798+
}
799+
}
800+
}
801+
802+
defer func() {
803+
if r.workEstimate.AdditionalLatency <= 0 {
804+
// release the seats allocated to this request immediately
805+
releaseSeatsLocked()
806+
return
807+
}
808+
809+
additionalLatency := r.workEstimate.AdditionalLatency
810+
// EventAfterDuration will execute the event func in a new goroutine,
811+
// so the seats allocated to this request will be released after
812+
// AdditionalLatency elapses, this ensures that the additional
813+
// latency has no impact on the user experience.
814+
qs.clock.EventAfterDuration(func(_ time.Time) {
815+
qs.lock.Lock()
816+
defer qs.lock.Unlock()
817+
releaseSeatsLocked()
818+
}, additionalLatency)
819+
}()
820+
783821
if r.queue == nil {
784822
if klog.V(6).Enabled() {
785823
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
786824
}
787825
return
788826
}
789827

790-
S := now.Sub(r.startTime).Seconds()
828+
// request has finished, remove from requests executing
829+
r.queue.requestsExecuting--
791830

792831
// When a request finishes being served, and the actual service time was S,
793832
// the queue’s virtual start time is decremented by (G - S)*width.
794833
r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
834+
}
795835

796-
// request has finished, remove from requests executing
797-
r.queue.requestsExecuting--
798-
r.queue.seatsInUse -= r.Seats()
799-
800-
if klog.V(6).Enabled() {
801-
klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
802-
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
803-
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
836+
func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
837+
if r.queue == nil {
838+
return
804839
}
805840

806841
// If there are more queues than desired and this one has no

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,79 @@ func TestSelectQueueLocked(t *testing.T) {
993993
}
994994
}
995995

996+
func TestFinishRequestLocked(t *testing.T) {
997+
tests := []struct {
998+
name string
999+
workEstimate fcrequest.WorkEstimate
1000+
}{
1001+
{
1002+
name: "request has additional latency",
1003+
workEstimate: fcrequest.WorkEstimate{
1004+
Seats: 10,
1005+
AdditionalLatency: time.Minute,
1006+
},
1007+
},
1008+
{
1009+
name: "request has no additional latency",
1010+
workEstimate: fcrequest.WorkEstimate{
1011+
Seats: 10,
1012+
},
1013+
},
1014+
}
1015+
1016+
metrics.Register()
1017+
for _, test := range tests {
1018+
t.Run(test.name, func(t *testing.T) {
1019+
metrics.Reset()
1020+
1021+
now := time.Now()
1022+
clk, _ := testeventclock.NewFake(now, 0, nil)
1023+
qs := &queueSet{
1024+
clock: clk,
1025+
obsPair: newObserverPair(clk),
1026+
}
1027+
queue := &queue{
1028+
requests: newRequestFIFO(),
1029+
}
1030+
r := &request{
1031+
qs: qs,
1032+
queue: queue,
1033+
workEstimate: test.workEstimate,
1034+
}
1035+
1036+
qs.totRequestsExecuting = 111
1037+
qs.totSeatsInUse = 222
1038+
queue.requestsExecuting = 11
1039+
queue.seatsInUse = 22
1040+
1041+
var (
1042+
queuesetTotalRequestsExecutingExpected = qs.totRequestsExecuting - 1
1043+
queuesetTotalSeatsInUseExpected = qs.totSeatsInUse - int(test.workEstimate.Seats)
1044+
queueRequestsExecutingExpected = queue.requestsExecuting - 1
1045+
queueSeatsInUseExpected = queue.seatsInUse - int(test.workEstimate.Seats)
1046+
)
1047+
1048+
qs.finishRequestLocked(r)
1049+
1050+
// as soon as AdditionalLatency elapses we expect the seats to be released
1051+
clk.SetTime(now.Add(test.workEstimate.AdditionalLatency))
1052+
1053+
if queuesetTotalRequestsExecutingExpected != qs.totRequestsExecuting {
1054+
t.Errorf("Expected total requests executing: %d, but got: %d", queuesetTotalRequestsExecutingExpected, qs.totRequestsExecuting)
1055+
}
1056+
if queuesetTotalSeatsInUseExpected != qs.totSeatsInUse {
1057+
t.Errorf("Expected total seats in use: %d, but got: %d", queuesetTotalSeatsInUseExpected, qs.totSeatsInUse)
1058+
}
1059+
if queueRequestsExecutingExpected != queue.requestsExecuting {
1060+
t.Errorf("Expected requests executing for queue: %d, but got: %d", queueRequestsExecutingExpected, queue.requestsExecuting)
1061+
}
1062+
if queueSeatsInUseExpected != queue.seatsInUse {
1063+
t.Errorf("Expected seats in use for queue: %d, but got: %d", queueSeatsInUseExpected, queue.seatsInUse)
1064+
}
1065+
})
1066+
}
1067+
}
1068+
9961069
func newFIFO(requests ...*request) fifo {
9971070
l := newRequestFIFO()
9981071
for i := range requests {

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/width.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package request
1919
import (
2020
"fmt"
2121
"net/http"
22+
"time"
2223

2324
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2425
"k8s.io/klog/v2"
@@ -35,6 +36,12 @@ const (
3536
type WorkEstimate struct {
3637
// Seats represents the number of seats associated with this request
3738
Seats uint
39+
40+
// AdditionalLatency specifies the additional duration the seats allocated
41+
// to this request must be reserved after the given request had finished.
42+
// AdditionalLatency should not have any impact on the user experience, the
43+
// caller must not experience this additional latency.
44+
AdditionalLatency time.Duration
3845
}
3946

4047
// objectCountGetterFunc represents a function that gets the total

0 commit comments

Comments
 (0)