Skip to content

Commit 25b4c17

Browse files
authored
Merge pull request kubernetes#87362 from MikeSpreitzer/limited-cancel2
Simplified and corrected logic around context cancelation in refactored QueueSet
2 parents 4af7350 + cbdd3a2 commit 25b4c17

File tree

8 files changed

+571
-214
lines changed

8 files changed

+571
-214
lines changed

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/interface.go

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,22 @@ import (
2121
"time"
2222
)
2323

24-
// QueueSetFactory is used to create QueueSet objects.
24+
// QueueSetFactory is used to create QueueSet objects. Creation, like
25+
// config update, is done in two phases: the first phase consumes the
26+
// QueuingConfig and the second consumes the DispatchingConfig. They
27+
// are separated so that errors from the first phase can be found
28+
// before committing to a concurrency allotment for the second.
2529
type QueueSetFactory interface {
26-
NewQueueSet(config QueueSetConfig) (QueueSet, error)
30+
// BeginConstruction does the first phase of creating a QueueSet
31+
BeginConstruction(QueuingConfig) (QueueSetCompleter, error)
32+
}
33+
34+
// QueueSetCompleter finishes the two-step process of creating or
35+
// reconfiguring a QueueSet
36+
type QueueSetCompleter interface {
37+
// Complete returns a QueueSet configured by the given
38+
// dispatching configuration.
39+
Complete(DispatchingConfig) QueueSet
2740
}
2841

2942
// QueueSet is the abstraction for the queuing and dispatching
@@ -34,19 +47,27 @@ type QueueSetFactory interface {
3447
// . Some day we may have connections between priority levels, but
3548
// today is not that day.
3649
type QueueSet interface {
37-
// SetConfiguration updates the configuration
38-
SetConfiguration(QueueSetConfig) error
39-
40-
// Quiesce controls whether the QueueSet is operating normally or is quiescing.
41-
// A quiescing QueueSet drains as normal but does not admit any
42-
// new requests. Passing a non-nil handler means the system should
43-
// be quiescing, a nil handler means the system should operate
44-
// normally. A call to Wait while the system is quiescing
45-
// will be rebuffed by returning tryAnother=true. If all the
46-
// queues have no requests waiting nor executing while the system
47-
// is quiescing then the handler will eventually be called with no
48-
// locks held (even if the system becomes non-quiescing between the
49-
// triggering state and the required call).
50+
// BeginConfigChange starts the two-step process of updating
51+
// the configuration. No change is made until Complete is
52+
// called. If `C := X.BeginConstruction(q)` then
53+
// `C.Complete(d)` returns the same value `X`. If the
54+
// QueuingConfig's DesiredNumQueues field is zero then the other
55+
// queuing-specific config parameters are not changed, so that the
56+
// queues continue draining as before.
57+
BeginConfigChange(QueuingConfig) (QueueSetCompleter, error)
58+
59+
// Quiesce controls whether the QueueSet is operating normally or
60+
// is quiescing. A quiescing QueueSet drains as normal but does
61+
// not admit any new requests. Passing a non-nil handler means the
62+
// system should be quiescing, a nil handler means the system
63+
// should operate normally. A call to Wait while the system is
64+
// quiescing will be rebuffed by returning tryAnother=true. If all
65+
// the queues have no requests waiting nor executing while the
66+
// system is quiescing then the handler will eventually be called
67+
// with no locks held (even if the system becomes non-quiescing
68+
// between the triggering state and the required call). In Go
69+
// Memory Model terms, the triggering state happens before the
70+
// call to the EmptyHandler.
5071
Quiesce(EmptyHandler)
5172

5273
// Wait uses the given hashValue as the source of entropy as it
@@ -56,34 +77,47 @@ type QueueSet interface {
5677
// tryAnother==true at return then the QueueSet has become
5778
// undesirable and the client should try to find a different
5879
// QueueSet to use; execute and afterExecution are irrelevant in
59-
// this case. Otherwise, if execute then the client should start
60-
// executing the request and, once the request finishes execution
61-
// or is canceled, call afterExecution(). Otherwise the client
62-
// should not execute the request and afterExecution is
63-
// irrelevant.
80+
// this case. In the terms of the Go Memory Model, there was a
81+
// call to Quiesce with a non-nil handler that happened before
82+
// this return from Wait. Otherwise, if execute then the client
83+
// should start executing the request and, once the request
84+
// finishes execution or is canceled, call afterExecution().
85+
// Otherwise the client should not execute the request and
86+
// afterExecution is irrelevant. Canceling the context while the
87+
// request is waiting in its queue will cut short that wait and
88+
// cause a return with tryAnother and execute both false; later
89+
// cancellations are the caller's problem.
6490
Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func())
6591
}
6692

67-
// QueueSetConfig defines the configuration of a QueueSet.
68-
type QueueSetConfig struct {
93+
// QueuingConfig defines the configuration of the queuing aspect of a QueueSet.
94+
type QueuingConfig struct {
6995
// Name is used to identify a queue set, allowing for descriptive information about its intended use
7096
Name string
71-
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
72-
ConcurrencyLimit int
97+
7398
// DesiredNumQueues is the number of queues that the API says
7499
// should exist now. This may be zero, in which case
75100
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
76101
DesiredNumQueues int
102+
77103
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
78104
QueueLengthLimit int
105+
79106
// HandSize is a parameter of shuffle sharding. Upon arrival of a request, a queue is chosen by randomly
80107
// dealing a "hand" of this many queues and then picking one of minimum length.
81108
HandSize int
109+
82110
// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
83111
// If, by the end of that time, the request has not been dispatched then it is rejected.
84112
RequestWaitLimit time.Duration
85113
}
86114

115+
// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
116+
type DispatchingConfig struct {
117+
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
118+
ConcurrencyLimit int
119+
}
120+
87121
// EmptyHandler is used to notify the callee when all the queues
88122
// of a QueueSet have been drained.
89123
type EmptyHandler interface {

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/interface.go

Lines changed: 100 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,114 @@ limitations under the License.
1616

1717
package promise
1818

19-
// Mutable is a variable that is initially not set and can be set one
20-
// or more times (unlike a traditional "promise", which can be written
21-
// only once).
22-
type Mutable interface {
19+
// This file defines interfaces for promises and futures and related
20+
// things. These are about coordination among multiple goroutines and
21+
// so are safe for concurrent calls --- although moderated in some
22+
// cases by a requirement that the caller hold a certain lock.
2323

24+
// Readable represents a variable that is initially not set and later
25+
// becomes set. Some instances may be set to multiple values in
26+
// series. A Readable for a variable that can only get one value is
27+
// commonly known as a "future".
28+
type Readable interface {
29+
// Get reads the current value of this variable. If this variable
30+
// is not set yet then this call blocks until this variable gets a
31+
// value.
32+
Get() interface{}
33+
34+
// IsSet returns immediately with an indication of whether this
35+
// variable has been set.
36+
IsSet() bool
37+
}
38+
39+
// LockingReadable is a Readable whose implementation is protected by
40+
// a lock
41+
type LockingReadable interface {
42+
Readable
43+
44+
// GetLocked is like Get but the caller must already hold the
45+
// lock. GetLocked may release, and later re-acquire, the lock
46+
// any number of times. Get may acquire, and later release, the
47+
// lock any number of times.
48+
GetLocked() interface{}
49+
50+
// IsSetLocked is like IsSet but the caller must already hold the
51+
// lock. IsSetLocked may release, and later re-acquire, the lock
52+
// any number of times. IsSet may acquire, and later release, the
53+
// lock any number of times.
54+
IsSetLocked() bool
55+
}
56+
57+
// WriteOnceOnly represents a variable that is initially not set and
58+
// can be set once.
59+
type WriteOnceOnly interface {
60+
// Set normally writes a value into this variable, unblocks every
61+
// goroutine waiting for this variable to have a value, and
62+
// returns true. In the unhappy case that this variable is
63+
// already set, this method returns false without modifying the
64+
// variable's value.
65+
Set(interface{}) bool
66+
}
67+
68+
// WriteOnce represents a variable that is initially not set and can
69+
// be set once and is readable. This is the common meaning for
70+
// "promise".
71+
type WriteOnce interface {
72+
Readable
73+
WriteOnceOnly
74+
}
75+
76+
// LockingWriteOnceOnly is a WriteOnceOnly whose implementation is
77+
// protected by a lock.
78+
type LockingWriteOnceOnly interface {
79+
WriteOnceOnly
80+
81+
// SetLocked is like Set but the caller must already hold the
82+
// lock. SetLocked may release, and later re-acquire, the lock
83+
// any number of times. Set may acquire, and later release, the
84+
// lock any number of times
85+
SetLocked(interface{}) bool
86+
}
87+
88+
// LockingWriteOnce is a WriteOnce whose implementation is protected
89+
// by a lock.
90+
type LockingWriteOnce interface {
91+
LockingReadable
92+
LockingWriteOnceOnly
93+
}
94+
95+
// WriteMultipleOnly represents a variable that is initially not set
96+
// and can be set one or more times (unlike a traditional "promise",
97+
// which can be written only once).
98+
type WriteMultipleOnly interface {
2499
// Set writes a value into this variable and unblocks every
25100
// goroutine waiting for this variable to have a value
26101
Set(interface{})
102+
}
27103

28-
// Get reads the value of this variable. If this variable is
29-
// not set yet then this call blocks until this variable gets a value.
30-
Get() interface{}
104+
// WriteMultiple represents a variable that is initially not set and
105+
// can be set one or more times (unlike a traditional "promise", which
106+
// can be written only once) and is readable.
107+
type WriteMultiple interface {
108+
Readable
109+
WriteMultipleOnly
31110
}
32111

33-
// LockingMutable is a Mutable whose implementation is protected by a lock
34-
type LockingMutable interface {
35-
Mutable
112+
// LockingWriteMultipleOnly is a WriteMultipleOnly whose
113+
// implementation is protected by a lock.
114+
type LockingWriteMultipleOnly interface {
115+
WriteMultipleOnly
36116

37-
// SetLocked is like Set but the caller must already hold the lock
117+
// SetLocked is like Set but the caller must already hold the
118+
// lock. SetLocked may release, and later re-acquire, the lock
119+
// any number of times. Set may acquire, and later release, the
120+
// lock any number of times
38121
SetLocked(interface{})
122+
}
39123

40-
// GetLocked is like Get but the caller must already hold the lock
41-
GetLocked() interface{}
124+
// LockingWriteMultiple is a WriteMultiple whose implementation is
125+
// protected by a lock.
126+
type LockingWriteMultiple interface {
127+
LockingReadable
128+
LockingWriteMultipleOnly
42129
}

staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise/lockingpromise.go

Lines changed: 79 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,57 +23,102 @@ import (
2323
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise"
2424
)
2525

26-
// lockingPromise implements LockingMutable based on a condition
27-
// variable. This implementation tracks active goroutines: the given
28-
// counter is decremented for a goroutine waiting for this varible to
29-
// be set and incremented when such a goroutine is unblocked.
30-
type lockingPromise struct {
26+
// promisoid is the data and behavior common to all the promise-like
27+
// abstractions implemented here. This implementation is based on a
28+
// condition variable. This implementation tracks active goroutines:
29+
// the given counter is decremented for a goroutine waiting for this
30+
// varible to be set and incremented when such a goroutine is
31+
// unblocked.
32+
type promisoid struct {
3133
lock sync.Locker
3234
cond sync.Cond
3335
activeCounter counter.GoRoutineCounter // counter of active goroutines
34-
waitingCount int // number of goroutines idle due to this mutable being unset
36+
waitingCount int // number of goroutines idle due to this being unset
3537
isSet bool
3638
value interface{}
3739
}
3840

39-
var _ promise.LockingMutable = &lockingPromise{}
41+
func (pr *promisoid) Get() interface{} {
42+
pr.lock.Lock()
43+
defer pr.lock.Unlock()
44+
return pr.GetLocked()
45+
}
4046

41-
// NewLockingPromise makes a new promise.LockingMutable
42-
func NewLockingPromise(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingMutable {
43-
return &lockingPromise{
44-
lock: lock,
45-
cond: *sync.NewCond(lock),
46-
activeCounter: activeCounter,
47+
func (pr *promisoid) GetLocked() interface{} {
48+
if !pr.isSet {
49+
pr.waitingCount++
50+
pr.activeCounter.Add(-1)
51+
pr.cond.Wait()
4752
}
53+
return pr.value
4854
}
4955

50-
func (lp *lockingPromise) Set(value interface{}) {
51-
lp.lock.Lock()
52-
defer lp.lock.Unlock()
53-
lp.SetLocked(value)
56+
func (pr *promisoid) IsSet() bool {
57+
pr.lock.Lock()
58+
defer pr.lock.Unlock()
59+
return pr.IsSetLocked()
5460
}
5561

56-
func (lp *lockingPromise) Get() interface{} {
57-
lp.lock.Lock()
58-
defer lp.lock.Unlock()
59-
return lp.GetLocked()
62+
func (pr *promisoid) IsSetLocked() bool {
63+
return pr.isSet
6064
}
6165

62-
func (lp *lockingPromise) SetLocked(value interface{}) {
63-
lp.isSet = true
64-
lp.value = value
65-
if lp.waitingCount > 0 {
66-
lp.activeCounter.Add(lp.waitingCount)
67-
lp.waitingCount = 0
68-
lp.cond.Broadcast()
66+
func (pr *promisoid) SetLocked(value interface{}) {
67+
pr.isSet = true
68+
pr.value = value
69+
if pr.waitingCount > 0 {
70+
pr.activeCounter.Add(pr.waitingCount)
71+
pr.waitingCount = 0
72+
pr.cond.Broadcast()
6973
}
7074
}
7175

72-
func (lp *lockingPromise) GetLocked() interface{} {
73-
if !lp.isSet {
74-
lp.waitingCount++
75-
lp.activeCounter.Add(-1)
76-
lp.cond.Wait()
76+
type writeOnce struct {
77+
promisoid
78+
}
79+
80+
var _ promise.LockingWriteOnce = &writeOnce{}
81+
82+
// NewWriteOnce makes a new promise.LockingWriteOnce
83+
func NewWriteOnce(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteOnce {
84+
return &writeOnce{promisoid{
85+
lock: lock,
86+
cond: *sync.NewCond(lock),
87+
activeCounter: activeCounter,
88+
}}
89+
}
90+
91+
func (wr *writeOnce) Set(value interface{}) bool {
92+
wr.lock.Lock()
93+
defer wr.lock.Unlock()
94+
return wr.SetLocked(value)
95+
}
96+
97+
func (wr *writeOnce) SetLocked(value interface{}) bool {
98+
if wr.isSet {
99+
return false
77100
}
78-
return lp.value
101+
wr.promisoid.SetLocked(value)
102+
return true
103+
}
104+
105+
type writeMultiple struct {
106+
promisoid
107+
}
108+
109+
var _ promise.LockingWriteMultiple = &writeMultiple{}
110+
111+
// NewWriteMultiple makes a new promise.LockingWriteMultiple
112+
func NewWriteMultiple(lock sync.Locker, activeCounter counter.GoRoutineCounter) promise.LockingWriteMultiple {
113+
return &writeMultiple{promisoid{
114+
lock: lock,
115+
cond: *sync.NewCond(lock),
116+
activeCounter: activeCounter,
117+
}}
118+
}
119+
120+
func (wr *writeMultiple) Set(value interface{}) {
121+
wr.lock.Lock()
122+
defer wr.lock.Unlock()
123+
wr.SetLocked(value)
79124
}

0 commit comments

Comments
 (0)