Skip to content

Commit 022120c

Browse files
authored
Merge pull request kubernetes#85192 from MikeSpreitzer/fq-impl
Added fair queuing for server requests
2 parents c98d9b6 + e10acc7 commit 022120c

File tree

25 files changed

+2362
-1
lines changed

25 files changed

+2362
-1
lines changed

staging/src/k8s.io/apiserver/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@ filegroup(
4242
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs",
4343
"//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs",
4444
"//staging/src/k8s.io/apiserver/pkg/util/feature:all-srcs",
45+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:all-srcs",
46+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:all-srcs",
47+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
4548
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
4649
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
50+
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
4751
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
4852
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
4953
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",

staging/src/k8s.io/apiserver/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ require (
2626
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
2727
github.com/hashicorp/golang-lru v0.5.1
2828
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
29-
github.com/pkg/errors v0.8.1 // indirect
29+
github.com/pkg/errors v0.8.1
3030
github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021 // indirect
31+
github.com/prometheus/client_golang v1.0.0
3132
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
3233
github.com/sirupsen/logrus v1.4.2 // indirect
3334
github.com/spf13/pflag v1.0.5
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = ["interface.go"],
6+
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/counter",
7+
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/counter",
8+
visibility = ["//visibility:public"],
9+
)
10+
11+
filegroup(
12+
name = "package-srcs",
13+
srcs = glob(["**"]),
14+
tags = ["automanaged"],
15+
visibility = ["//visibility:private"],
16+
)
17+
18+
filegroup(
19+
name = "all-srcs",
20+
srcs = [":package-srcs"],
21+
tags = ["automanaged"],
22+
visibility = ["//visibility:public"],
23+
)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package counter
18+
19+
// GoRoutineCounter keeps track of the number of active goroutines
20+
// working on/for something. This is a utility that makes such code more
21+
// testable. The code uses this utility to report the number of active
22+
// goroutines to the test code, so that the test code can advance a fake
23+
// clock when and only when the code being tested has finished all
24+
// the work that is ready to do at the present time.
25+
type GoRoutineCounter interface {
26+
// Add adds the given delta to the count of active goroutines.
27+
// Call Add(1) before forking a goroutine, Add(-1) at the end of that goroutine.
28+
// Call Add(-1) just before waiting on something from another goroutine (e.g.,
29+
// just before a `select`).
30+
// Call Add(1) just before doing something that unblocks a goroutine that is
31+
// waiting on that something.
32+
Add(delta int)
33+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = ["interface.go"],
6+
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
7+
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
8+
visibility = ["//visibility:public"],
9+
)
10+
11+
filegroup(
12+
name = "package-srcs",
13+
srcs = glob(["**"]),
14+
tags = ["automanaged"],
15+
visibility = ["//visibility:private"],
16+
)
17+
18+
filegroup(
19+
name = "all-srcs",
20+
srcs = [
21+
":package-srcs",
22+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset:all-srcs",
23+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:all-srcs",
24+
],
25+
tags = ["automanaged"],
26+
visibility = ["//visibility:public"],
27+
)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fairqueuing
18+
19+
import (
20+
"context"
21+
"time"
22+
)
23+
24+
// QueueSetFactory is used to create QueueSet objects.
25+
type QueueSetFactory interface {
26+
NewQueueSet(config QueueSetConfig) (QueueSet, error)
27+
}
28+
29+
// QueueSet is the abstraction for the queuing and dispatching
30+
// functionality of one non-exempt priority level. It covers the
31+
// functionality described in the "Assignment to a Queue", "Queuing",
32+
// and "Dispatching" sections of
33+
// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md
34+
// . Some day we may have connections between priority levels, but
35+
// today is not that day.
36+
type QueueSet interface {
37+
// SetConfiguration updates the configuration
38+
SetConfiguration(QueueSetConfig) error
39+
40+
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
41+
// A quiescing QueueSet drains as normal but does not admit any
42+
// new requests. Passing a non-nil handler means the system should
43+
// be quiescing, a nil handler means the system should operate
44+
// normally. A call to Wait while the system is quiescing
45+
// will be rebuffed by returning tryAnother=true. If all the
46+
// queues have no requests waiting nor executing while the system
47+
// is quiescing then the handler will eventually be called with no
48+
// locks held (even if the system becomes non-quiescing between the
49+
// triggering state and the required call).
50+
Quiesce(EmptyHandler)
51+
52+
// Wait uses the given hashValue as the source of entropy as it
53+
// shuffle-shards a request into a queue and waits for a decision
54+
// on what to do with that request. The descr1 and descr2 values
55+
// play no role in the logic but appear in log messages. If
56+
// tryAnother==true at return then the QueueSet has become
57+
// undesirable and the client should try to find a different
58+
// QueueSet to use; execute and afterExecution are irrelevant in
59+
// this case. Otherwise, if execute then the client should start
60+
// executing the request and, once the request finishes execution
61+
// or is canceled, call afterExecution(). Otherwise the client
62+
// should not execute the request and afterExecution is
63+
// irrelevant.
64+
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
65+
}
66+
67+
// QueueSetConfig defines the configuration of a QueueSet.
68+
type QueueSetConfig struct {
69+
// Name is used to identify a queue set, allowing for descriptive information about its intended use
70+
Name string
71+
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
72+
ConcurrencyLimit int
73+
// DesiredNumQueues is the number of queues that the API says should exist now
74+
DesiredNumQueues int
75+
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
76+
QueueLengthLimit int
77+
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
78+
// dealing a "hand" of this many queues and then picking one of minimum length.
79+
HandSize int
80+
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
81+
// If, by the end of that time, the request has not been dispatched then it is rejected.
82+
RequestWaitLimit time.Duration
83+
}
84+
85+
// EmptyHandler is used to notify the callee when all the queues
86+
// of a QueueSet have been drained.
87+
type EmptyHandler interface {
88+
// HandleEmpty is called to deliver the notification
89+
HandleEmpty()
90+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "go_default_library",
5+
srcs = [
6+
"doc.go",
7+
"queueset.go",
8+
"types.go",
9+
],
10+
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
11+
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
12+
visibility = ["//visibility:public"],
13+
deps = [
14+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
15+
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
16+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
17+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
18+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
19+
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
20+
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
21+
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
22+
"//vendor/github.com/pkg/errors:go_default_library",
23+
"//vendor/k8s.io/klog:go_default_library",
24+
],
25+
)
26+
27+
go_test(
28+
name = "go_default_test",
29+
srcs = ["queueset_test.go"],
30+
embed = [":go_default_library"],
31+
deps = [
32+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
33+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
34+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
35+
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
36+
"//vendor/k8s.io/klog:go_default_library",
37+
],
38+
)
39+
40+
filegroup(
41+
name = "package-srcs",
42+
srcs = glob(["**"]),
43+
tags = ["automanaged"],
44+
visibility = ["//visibility:private"],
45+
)
46+
47+
filegroup(
48+
name = "all-srcs",
49+
srcs = [":package-srcs"],
50+
tags = ["automanaged"],
51+
visibility = ["//visibility:public"],
52+
)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queueset
18+
19+
// This package implements a technique called "fair queuing for server
20+
// requests". One QueueSet is a set of queues operating according to
21+
// this technique.
22+
23+
// Fair queuing for server requests is inspired by the fair queuing
24+
// technique from the world of networking. You can find a good paper
25+
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
26+
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
27+
// and there is an implementation outline in the Wikipedia article at
28+
// https://en.wikipedia.org/wiki/Fair_queuing .
29+
30+
// Fair queuing for server requests differs from traditional fair
31+
// queuing in three ways: (1) we are dispatching requests to be
32+
// executed within a process rather than transmitting packets on a
33+
// network link, (2) multiple requests can be executing at once, and
34+
// (3) the service time (execution duration) is not known until the
35+
// execution completes.
36+
37+
// The first two differences can easily be handled by straightforward
38+
// adaptation of the concept called "R(t)" in the original paper and
39+
// "virtual time" in the implementation outline. In that
40+
// implementation outline, the notation now() is used to mean reading
41+
// the virtual clock. In the original paper’s terms, "R(t)" is the
42+
// number of "rounds" that have been completed at real time t, where a
43+
// round consists of virtually transmitting one bit from every
44+
// non-empty queue in the router (regardless of which queue holds the
45+
// packet that is really being transmitted at the moment); in this
46+
// conception, a packet is considered to be "in" its queue until the
47+
// packet’s transmission is finished. For our problem, we can define a
48+
// round to be giving one nanosecond of CPU to every non-empty queue
49+
// in the apiserver (where emptiness is judged based on both queued
50+
// and executing requests from that queue), and define R(t) = (server
51+
// start time) + (1 ns) * (number of rounds since server start). Let
52+
// us write NEQ(t) for that number of non-empty queues in the
53+
// apiserver at time t. Let us also write C for the concurrency
54+
// limit. In the original paper, the partial derivative of R(t) with
55+
// respect to t is
56+
//
57+
// 1 / NEQ(t) .
58+
59+
// To generalize from transmitting one packet at a time to executing C
60+
// requests at a time, that derivative becomes
61+
//
62+
// C / NEQ(t) .
63+
64+
// However, sometimes there are fewer than C requests available to
65+
// execute. For a given queue "q", let us also write "reqs(q, t)" for
66+
// the number of requests of that queue that are executing at that
67+
// time. The total number of requests executing is sum[over q]
68+
// reqs(q, t) and if that is less than C then virtual time is not
69+
// advancing as fast as it would if all C seats were occupied; in this
70+
// case the numerator of the quotient in that derivative should be
71+
// adjusted proportionally. Putting it all together for fair queing
72+
// for server requests: at a particular time t, the partial derivative
73+
// of R(t) with respect to t is
74+
//
75+
// min( C, sum[over q] reqs(q, t) ) / NEQ(t) .
76+
//
77+
// In terms of the implementation outline, this is the rate at which
78+
// virtual time is advancing at time t (in virtual nanoseconds per
79+
// real nanosecond). Where the networking implementation outline adds
80+
// packet size to a virtual time, in our version this corresponds to
81+
// adding a service time (i.e., duration) to virtual time.
82+
83+
// The third difference is handled by modifying the algorithm to
84+
// dispatch based on an initial guess at the request’s service time
85+
// (duration) and then make the corresponding adjustments once the
86+
// request’s actual service time is known. This is similar, although
87+
// not exactly isomorphic, to the original paper’s adjustment by
88+
// `$delta` for the sake of promptness.
89+
90+
// For implementation simplicity (see below), let us use the same
91+
// initial service time guess for every request; call that duration
92+
// G. A good choice might be the service time limit (1
93+
// minute). Different guesses will give slightly different dynamics,
94+
// but any positive number can be used for G without ruining the
95+
// long-term behavior.
96+
97+
// As in ordinary fair queuing, there is a bound on divergence from
98+
// the ideal. In plain fair queuing the bound is one packet; in our
99+
// version it is C requests.
100+
101+
// To support efficiently making the necessary adjustments once a
102+
// request’s actual service time is known, the virtual finish time of
103+
// a request and the last virtual finish time of a queue are not
104+
// represented directly but instead computed from queue length,
105+
// request position in the queue, and an alternate state variable that
106+
// holds the queue’s virtual start time. While the queue is empty and
107+
// has no requests executing: the value of its virtual start time
108+
// variable is ignored and its last virtual finish time is considered
109+
// to be in the virtual past. When a request arrives to an empty queue
110+
// with no requests executing, the queue’s virtual start time is set
111+
// to the current virtual time. The virtual finish time of request
112+
// number J in the queue (counting from J=1 for the head) is J * G +
113+
// (queue's virtual start time). While the queue is non-empty: the
114+
// last virtual finish time of the queue is the virtual finish time of
115+
// the last request in the queue. While the queue is empty and has a
116+
// request executing: the last virtual finish time is the queue’s
117+
// virtual start time. When a request is dequeued for service the
118+
// queue’s virtual start time is advanced by G. When a request
119+
// finishes being served, and the actual service time was S, the
120+
// queue’s virtual start time is decremented by G - S.

0 commit comments

Comments
 (0)