Skip to content

Commit 6a84edb

Browse files
authored
Merge pull request kubernetes#120222 from tkashem/apf-queue-wait-ctx
apf: manage request queue wait with context in APF Filter
2 parents 67733c6 + 6297067 commit 6a84edb

File tree

16 files changed

+214
-177
lines changed

16 files changed

+214
-177
lines changed

pkg/controlplane/apiserver/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
202202
versionedInformer,
203203
extclient.FlowcontrolV1beta3(),
204204
s.GenericServerRunOptions.MaxRequestsInFlight+s.GenericServerRunOptions.MaxMutatingRequestsInFlight,
205-
s.GenericServerRunOptions.RequestTimeout/4,
206205
), nil
207206
}
208207

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
915915
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
916916
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
917917
handler = filterlatency.TrackCompleted(handler)
918-
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
918+
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
919919
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
920920
} else {
921921
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)

staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
3636
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
3737
"k8s.io/klog/v2"
38+
utilsclock "k8s.io/utils/clock"
3839
)
3940

4041
// PriorityAndFairnessClassification identifies the results of
@@ -78,6 +79,10 @@ type priorityAndFairnessHandler struct {
7879
// the purpose of computing RetryAfter header to avoid system
7980
// overload.
8081
droppedRequests utilflowcontrol.DroppedRequestsTracker
82+
83+
// newReqWaitCtxFn creates a derived context with a deadline
84+
// of how long a given request can wait in its queue.
85+
newReqWaitCtxFn func(context.Context) (context.Context, context.CancelFunc)
8186
}
8287

8388
func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Request) {
@@ -240,16 +245,17 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
240245
resultCh <- err
241246
}()
242247

243-
// We create handleCtx with explicit cancelation function.
244-
// The reason for it is that Handle() underneath may start additional goroutine
248+
// We create handleCtx with an adjusted deadline, for two reasons.
249+
// One is to limit the time the request waits before its execution starts.
250+
// The other reason for it is that Handle() underneath may start additional goroutine
245251
// that is blocked on context cancellation. However, from APF point of view,
246252
// we don't want to wait until the whole watch request is processed (which is
247253
// when it context is actually cancelled) - we want to unblock the goroutine as
248254
// soon as the request is processed from the APF point of view.
249255
//
250256
// Note that we explicitly do NOT call the actuall handler using that context
251257
// to avoid cancelling request too early.
252-
handleCtx, handleCtxCancel := context.WithCancel(ctx)
258+
handleCtx, handleCtxCancel := h.newReqWaitCtxFn(ctx)
253259
defer handleCtxCancel()
254260

255261
// Note that Handle will return irrespective of whether the request
@@ -286,7 +292,11 @@ func (h *priorityAndFairnessHandler) Handle(w http.ResponseWriter, r *http.Reque
286292
h.handler.ServeHTTP(w, r)
287293
}
288294

289-
h.fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
295+
func() {
296+
handleCtx, cancelFn := h.newReqWaitCtxFn(ctx)
297+
defer cancelFn()
298+
h.fcIfc.Handle(handleCtx, digest, noteFn, estimateWork, queueNote, execute)
299+
}()
290300
}
291301

292302
if !served {
@@ -309,6 +319,7 @@ func WithPriorityAndFairness(
309319
longRunningRequestCheck apirequest.LongRunningRequestCheck,
310320
fcIfc utilflowcontrol.Interface,
311321
workEstimator flowcontrolrequest.WorkEstimatorFunc,
322+
defaultRequestWaitLimit time.Duration,
312323
) http.Handler {
313324
if fcIfc == nil {
314325
klog.Warningf("priority and fairness support not found, skipping")
@@ -322,12 +333,18 @@ func WithPriorityAndFairness(
322333
waitingMark.mutatingObserver = fcmetrics.GetWaitingMutatingConcurrency()
323334
})
324335

336+
clock := &utilsclock.RealClock{}
337+
newReqWaitCtxFn := func(ctx context.Context) (context.Context, context.CancelFunc) {
338+
return getRequestWaitContext(ctx, defaultRequestWaitLimit, clock)
339+
}
340+
325341
priorityAndFairnessHandler := &priorityAndFairnessHandler{
326342
handler: handler,
327343
longRunningRequestCheck: longRunningRequestCheck,
328344
fcIfc: fcIfc,
329345
workEstimator: workEstimator,
330346
droppedRequests: utilflowcontrol.NewDroppedRequestsTracker(),
347+
newReqWaitCtxFn: newReqWaitCtxFn,
331348
}
332349
return http.HandlerFunc(priorityAndFairnessHandler.Handle)
333350
}
@@ -356,3 +373,48 @@ func tooManyRequests(req *http.Request, w http.ResponseWriter, retryAfter string
356373
w.Header().Set("Retry-After", retryAfter)
357374
http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
358375
}
376+
377+
// getRequestWaitContext returns a new context with a deadline of how
378+
// long the request is allowed to wait before it is removed from its
379+
// queue and rejected.
380+
// The context.CancelFunc returned must never be nil and the caller is
381+
// responsible for calling the CancelFunc function for cleanup.
382+
// - ctx: the context associated with the request (it may or may
383+
// not have a deadline).
384+
// - defaultRequestWaitLimit: the default wait duration that is used
385+
// if the request context does not have any deadline.
386+
// (a) initialization of a watch or
387+
// (b) a request whose context has no deadline
388+
//
389+
// clock comes in handy for testing the function
390+
func getRequestWaitContext(ctx context.Context, defaultRequestWaitLimit time.Duration, clock utilsclock.PassiveClock) (context.Context, context.CancelFunc) {
391+
if ctx.Err() != nil {
392+
return ctx, func() {}
393+
}
394+
395+
reqArrivedAt := clock.Now()
396+
if reqReceivedTimestamp, ok := apirequest.ReceivedTimestampFrom(ctx); ok {
397+
reqArrivedAt = reqReceivedTimestamp
398+
}
399+
400+
// a) we will allow the request to wait in the queue for one
401+
// fourth of the time of its allotted deadline.
402+
// b) if the request context does not have any deadline
403+
// then we default to 'defaultRequestWaitLimit'
404+
// in any case, the wait limit for any request must not
405+
// exceed the hard limit of 1m
406+
//
407+
// request has deadline:
408+
// wait-limit = min(remaining deadline / 4, 1m)
409+
// request has no deadline:
410+
// wait-limit = min(defaultRequestWaitLimit, 1m)
411+
thisReqWaitLimit := defaultRequestWaitLimit
412+
if deadline, ok := ctx.Deadline(); ok {
413+
thisReqWaitLimit = deadline.Sub(reqArrivedAt) / 4
414+
}
415+
if thisReqWaitLimit > time.Minute {
416+
thisReqWaitLimit = time.Minute
417+
}
418+
419+
return context.WithDeadline(ctx, reqArrivedAt.Add(thisReqWaitLimit))
420+
}

0 commit comments

Comments
 (0)