Skip to content

Commit fa06004

Browse files
author
Benjamin Reed
authored
Merge pull request #92 from codeblooded/bug/serial-sessions
Fix blocking issue with multiple sessions
2 parents ecbaf9f + 8ad2b37 commit fa06004

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

testctrl/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls
5252
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
5353
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
5454
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
55+
github.com/google/glog v0.4.0 h1:WV2GdGOpRcDyRt1i9LHUcpATSfmbxDOHL/I5OtjndLI=
5556
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
5657
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
5758
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

testctrl/svc/orch/controller.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,10 @@ func (c *Controller) spawnExecutor(session *types.Session) {
228228
c.incExecutors()
229229

230230
go func() {
231-
defer c.decExecutors()
231+
defer func() {
232+
c.decExecutors()
233+
c.waitQueue.Done(session)
234+
}()
232235

233236
if err := executor.Execute(session); err != nil {
234237
glog.Infof("%v", err)

testctrl/svc/orch/watcher.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (w *Watcher) Subscribe(sessionName string) (<-chan *PodWatchEvent, error) {
7171
return nil, fmt.Errorf("session %v already has a follower", sessionName)
7272
}
7373

74-
eventChan := make(chan *PodWatchEvent)
74+
eventChan := make(chan *PodWatchEvent, eventBufferSize)
7575
w.eventChans[sessionName] = eventChan
7676
return eventChan, nil
7777
}
@@ -133,7 +133,16 @@ func (w *Watcher) publish(event *PodWatchEvent) {
133133
defer w.mux.Unlock()
134134

135135
eventChan := w.eventChans[event.SessionName]
136-
eventChan <- event
136+
if eventChan == nil {
137+
glog.Warningf("watcher: received event for session without subscriber: %v", event)
138+
return
139+
}
140+
141+
if len(eventChan) < cap(eventChan) {
142+
eventChan <- event
143+
} else {
144+
glog.Warningf("watcher: too many events unread in subscriber channel, dropping: %v", event)
145+
}
137146
}
138147

139148
func (w *Watcher) diagnose(pod *corev1.Pod) (Health, error) {
@@ -192,3 +201,5 @@ type PodWatchEvent struct {
192201
// Error may provide the error details that led to the failing health.
193202
Error error
194203
}
204+
205+
const eventBufferSize = 32

0 commit comments

Comments
 (0)