Skip to content

Commit 8ad2b37

Browse files
author
Ben Reed
committed
Fix blocking issue with multiple sessions
When multiple sessions were run serially, the second would block forever. The Watcher type sends Kubernetes events for a specific session over a channel to the executor. During clean-up the Watcher continued to post events, but the executor never read these. Since the Watcher used an unbuffered channel, it blocked until it was read. This blocking caused a lock to never be released. This lock was needed to create a new channel for a new session. This change switches the Watcher to use a buffered channel which does not block until it is full. The Watcher is also modified to drop events when the channel's capacity is exceeded. This prevents it from blocking during manual tests and allows additional sessions to be scheduled. In addition, this change adds a missing call to (*queue).Done in the controller. This call is necessary to instruct the reservation system that machines are available for scheduling.
1 parent ecbaf9f commit 8ad2b37

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

testctrl/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls
5252
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
5353
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
5454
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
55+
github.com/google/glog v0.4.0 h1:WV2GdGOpRcDyRt1i9LHUcpATSfmbxDOHL/I5OtjndLI=
5556
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
5657
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
5758
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

testctrl/svc/orch/controller.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,10 @@ func (c *Controller) spawnExecutor(session *types.Session) {
228228
c.incExecutors()
229229

230230
go func() {
231-
defer c.decExecutors()
231+
defer func() {
232+
c.decExecutors()
233+
c.waitQueue.Done(session)
234+
}()
232235

233236
if err := executor.Execute(session); err != nil {
234237
glog.Infof("%v", err)

testctrl/svc/orch/watcher.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (w *Watcher) Subscribe(sessionName string) (<-chan *PodWatchEvent, error) {
7171
return nil, fmt.Errorf("session %v already has a follower", sessionName)
7272
}
7373

74-
eventChan := make(chan *PodWatchEvent)
74+
eventChan := make(chan *PodWatchEvent, eventBufferSize)
7575
w.eventChans[sessionName] = eventChan
7676
return eventChan, nil
7777
}
@@ -133,7 +133,16 @@ func (w *Watcher) publish(event *PodWatchEvent) {
133133
defer w.mux.Unlock()
134134

135135
eventChan := w.eventChans[event.SessionName]
136-
eventChan <- event
136+
if eventChan == nil {
137+
glog.Warningf("watcher: received event for session without subscriber: %v", event)
138+
return
139+
}
140+
141+
if len(eventChan) < cap(eventChan) {
142+
eventChan <- event
143+
} else {
144+
glog.Warningf("watcher: too many events unread in subscriber channel, dropping: %v", event)
145+
}
137146
}
138147

139148
func (w *Watcher) diagnose(pod *corev1.Pod) (Health, error) {
@@ -192,3 +201,5 @@ type PodWatchEvent struct {
192201
// Error may provide the error details that led to the failing health.
193202
Error error
194203
}
204+
205+
const eventBufferSize = 32

0 commit comments

Comments
 (0)