Skip to content

Commit 1c31b2b

Browse files
committed
Brushing up queueset
(1) Replaced random-looking assortment of counter increments and decrements with something hopefully more principalled-looking. Most importantly, introduced the MutablePromise abstraction to neatly wrap up the complicated business of unioning multiple sources of unblocking. (2) Improved debug logging. (3) Somewhat more interesting test cases, and a bug fix wrt round robin index.
1 parent 6619df1 commit 1c31b2b

File tree

16 files changed

+732
-317
lines changed

16 files changed

+732
-317
lines changed

api/openapi-spec/swagger.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

staging/src/k8s.io/apiserver/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ filegroup(
4747
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:all-srcs",
4848
"//staging/src/k8s.io/apiserver/pkg/util/flushwriter:all-srcs",
4949
"//staging/src/k8s.io/apiserver/pkg/util/openapi:all-srcs",
50+
"//staging/src/k8s.io/apiserver/pkg/util/promise:all-srcs",
5051
"//staging/src/k8s.io/apiserver/pkg/util/proxy:all-srcs",
5152
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:all-srcs",
5253
"//staging/src/k8s.io/apiserver/pkg/util/term:all-srcs",

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/BUILD

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "go_default_library",
5-
srcs = [
6-
"interface.go",
7-
"types.go",
8-
],
5+
srcs = ["interface.go"],
96
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
107
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing",
118
visibility = ["//visibility:public"],

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,19 @@ type QueueSet interface {
4949
// triggering state and the required call).
5050
Quiesce(EmptyHandler)
5151

52-
// Wait uses the given hashValue as the source of entropy
53-
// as it shuffle-shards a request into a queue and waits for
54-
// a decision on what to do with that request. If tryAnother==true
55-
// at return then the QueueSet has become undesirable and the client
56-
// should try to find a different QueueSet to use; execute and
57-
// afterExecution are irrelevant in this case. Otherwise, if execute
58-
// then the client should start executing the request and, once the
59-
// request finishes execution or is canceled, call afterExecution().
60-
// Otherwise the client should not execute the
61-
// request and afterExecution is irrelevant.
62-
Wait(ctx context.Context, hashValue uint64) (tryAnother, execute bool, afterExecution func())
52+
// Wait uses the given hashValue as the source of entropy as it
53+
// shuffle-shards a request into a queue and waits for a decision
54+
// on what to do with that request. The descr1 and descr2 values
55+
// play no role in the logic but appear in log messages. If
56+
// tryAnother==true at return then the QueueSet has become
57+
// undesirable and the client should try to find a different
58+
// QueueSet to use; execute and afterExecution are irrelevant in
59+
// this case. Otherwise, if execute then the client should start
60+
// executing the request and, once the request finishes execution
61+
// or is canceled, call afterExecution(). Otherwise the client
62+
// should not execute the request and afterExecution is
63+
// irrelevant.
64+
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
6365
}
6466

6567
// QueueSetConfig defines the configuration of a QueueSet.

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/BUILD

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "go_default_library",
5-
srcs = ["queueset.go"],
5+
srcs = [
6+
"doc.go",
7+
"queueset.go",
8+
"types.go",
9+
],
610
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
711
importpath = "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset",
812
visibility = ["//visibility:public"],
@@ -12,6 +16,8 @@ go_library(
1216
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/counter:go_default_library",
1317
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
1418
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library",
19+
"//staging/src/k8s.io/apiserver/pkg/util/promise:go_default_library",
20+
"//staging/src/k8s.io/apiserver/pkg/util/promise/lockingpromise:go_default_library",
1521
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
1622
"//vendor/github.com/pkg/errors:go_default_library",
1723
"//vendor/k8s.io/klog:go_default_library",
@@ -27,6 +33,7 @@ go_test(
2733
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library",
2834
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing:go_default_library",
2935
"//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock:go_default_library",
36+
"//vendor/k8s.io/klog:go_default_library",
3037
],
3138
)
3239

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queueset
18+
19+
// This package implements a technique called "fair queuing for server
20+
// requests". One QueueSet is a set of queues operating according to
21+
// this technique.
22+
23+
// Fair queuing for server requests is inspired by the fair queuing
24+
// technique from the world of networking. You can find a good paper
25+
// on that at https://dl.acm.org/citation.cfm?doid=75247.75248 or
26+
// http://people.csail.mit.edu/imcgraw/links/research/pubs/networks/WFQ.pdf
27+
// and there is an implementation outline in the Wikipedia article at
28+
// https://en.wikipedia.org/wiki/Fair_queuing .
29+
30+
// Fair queuing for server requests differs from traditional fair
31+
// queuing in three ways: (1) we are dispatching requests to be
32+
// executed within a process rather than transmitting packets on a
33+
// network link, (2) multiple requests can be executing at once, and
34+
// (3) the service time (execution duration) is not known until the
35+
// execution completes.
36+
37+
// The first two differences can easily be handled by straightforward
38+
// adaptation of the concept called "R(t)" in the original paper and
39+
// "virtual time" in the implementation outline. In that
40+
// implementation outline, the notation now() is used to mean reading
41+
// the virtual clock. In the original paper’s terms, "R(t)" is the
42+
// number of "rounds" that have been completed at real time t, where a
43+
// round consists of virtually transmitting one bit from every
44+
// non-empty queue in the router (regardless of which queue holds the
45+
// packet that is really being transmitted at the moment); in this
46+
// conception, a packet is considered to be "in" its queue until the
47+
// packet’s transmission is finished. For our problem, we can define a
48+
// round to be giving one nanosecond of CPU to every non-empty queue
49+
// in the apiserver (where emptiness is judged based on both queued
50+
// and executing requests from that queue), and define R(t) = (server
51+
// start time) + (1 ns) * (number of rounds since server start). Let
52+
// us write NEQ(t) for that number of non-empty queues in the
53+
// apiserver at time t. Let us also write C for the concurrency
54+
// limit. In the original paper, the partial derivative of R(t) with
55+
// respect to t is
56+
//
57+
// 1 / NEQ(t) .
58+
59+
// To generalize from transmitting one packet at a time to executing C
60+
// requests at a time, that derivative becomes
61+
//
62+
// C / NEQ(t) .
63+
64+
// However, sometimes there are fewer than C requests available to
65+
// execute. For a given queue "q", let us also write "reqs(q, t)" for
66+
// the number of requests of that queue that are executing at that
67+
// time. The total number of requests executing is sum[over q]
68+
// reqs(q, t) and if that is less than C then virtual time is not
69+
// advancing as fast as it would if all C seats were occupied; in this
70+
// case the numerator of the quotient in that derivative should be
71+
// adjusted proportionally. Putting it all together for fair queing
72+
// for server requests: at a particular time t, the partial derivative
73+
// of R(t) with respect to t is
74+
//
75+
// min( C, sum[over q] reqs(q, t) ) / NEQ(t) .
76+
//
77+
// In terms of the implementation outline, this is the rate at which
78+
// virtual time is advancing at time t (in virtual nanoseconds per
79+
// real nanosecond). Where the networking implementation outline adds
80+
// packet size to a virtual time, in our version this corresponds to
81+
// adding a service time (i.e., duration) to virtual time.
82+
83+
// The third difference is handled by modifying the algorithm to
84+
// dispatch based on an initial guess at the request’s service time
85+
// (duration) and then make the corresponding adjustments once the
86+
// request’s actual service time is known. This is similar, although
87+
// not exactly isomorphic, to the original paper’s adjustment by
88+
// `$delta` for the sake of promptness.
89+
90+
// For implementation simplicity (see below), let us use the same
91+
// initial service time guess for every request; call that duration
92+
// G. A good choice might be the service time limit (1
93+
// minute). Different guesses will give slightly different dynamics,
94+
// but any positive number can be used for G without ruining the
95+
// long-term behavior.
96+
97+
// As in ordinary fair queuing, there is a bound on divergence from
98+
// the ideal. In plain fair queuing the bound is one packet; in our
99+
// version it is C requests.
100+
101+
// To support efficiently making the necessary adjustments once a
102+
// request’s actual service time is known, the virtual finish time of
103+
// a request and the last virtual finish time of a queue are not
104+
// represented directly but instead computed from queue length,
105+
// request position in the queue, and an alternate state variable that
106+
// holds the queue’s virtual start time. While the queue is empty and
107+
// has no requests executing: the value of its virtual start time
108+
// variable is ignored and its last virtual finish time is considered
109+
// to be in the virtual past. When a request arrives to an empty queue
110+
// with no requests executing, the queue’s virtual start time is set
111+
// to the current virtual time. The virtual finish time of request
112+
// number J in the queue (counting from J=1 for the head) is J * G +
113+
// (queue's virtual start time). While the queue is non-empty: the
114+
// last virtual finish time of the queue is the virtual finish time of
115+
// the last request in the queue. While the queue is empty and has a
116+
// request executing: the last virtual finish time is the queue’s
117+
// virtual start time. When a request is dequeued for service the
118+
// queue’s virtual start time is advanced by G. When a request
119+
// finishes being served, and the actual service time was S, the
120+
// queue’s virtual start time is decremented by G - S.

0 commit comments

Comments
 (0)