Skip to content
This repository was archived by the owner on Feb 1, 2021. It is now read-only.

Commit 3cd5ce5

Browse files
author
Dani Louca
committed
Force resume refreshLoop when engine stat becomes healthy
Signed-off-by: Dani Louca <[email protected]> Adding test Signed-off-by: Dani Louca <[email protected]>
1 parent 39939d7 commit 3cd5ce5

File tree

2 files changed

+105
-5
lines changed

2 files changed

+105
-5
lines changed

cluster/engine.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,22 @@ var (
7171
testErrImageNotFound = errors.New("TEST_ERR_IMAGE_NOT_FOUND_SWARM")
7272
)
7373

74-
// delayer offers a simple API to random delay within a given time range.
74+
// delayer offers a simple API to random delay within a given time range,
75+
// and to force resume delays
7576
type delayer struct {
7677
rangeMin time.Duration
7778
rangeMax time.Duration
7879

79-
r *rand.Rand
80-
l sync.Mutex
80+
resumeCh chan bool
81+
r *rand.Rand
82+
l sync.Mutex
8183
}
8284

8385
func newDelayer(rangeMin, rangeMax time.Duration) *delayer {
8486
return &delayer{
8587
rangeMin: rangeMin,
8688
rangeMax: rangeMax,
89+
resumeCh: make(chan bool, 1),
8790
r: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
8891
}
8992
}
@@ -417,10 +420,21 @@ func (e *Engine) CheckConnectionErr(err error) {
417420
if err == nil {
418421
e.setErrMsg("")
419422
// If current state is unhealthy, change it to healthy
423+
// A lock is needed to avoid race condition, which can lead to block on resumeCh
424+
// We should not block on slow operations, ex: emitEvent that ends up calling the engine
425+
// instead, we'll check the changed flag which is set to true in the lock block
426+
changed := false
427+
e.Lock()
420428
if e.state == stateUnhealthy {
421-
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.getFailureCount())
429+
changed = true
430+
e.state = stateHealthy
431+
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
432+
}
433+
e.Unlock()
434+
if changed {
435+
// signal the refresh loop so we don't wait too long, especially if the failure count was high
436+
e.refreshDelayer.resumeCh <- true
422437
e.emitEvent("engine_reconnect")
423-
e.setState(stateHealthy)
424438
}
425439
e.resetFailureCount()
426440
return
@@ -918,6 +932,7 @@ func (e *Engine) refreshLoop() {
918932
}
919933
// Wait for the delayer or quit if we get stopped.
920934
select {
935+
case <-e.refreshDelayer.resumeCh:
921936
case <-e.refreshDelayer.Wait(backoffFactor):
922937
case <-e.stopCh:
923938
return

cluster/engine_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,3 +821,88 @@ func TestRemoveImage(t *testing.T) {
821821
}
822822
apiClient.Mock.AssertExpectations(t)
823823
}
824+
825+
func TestRefreshLoop(t *testing.T) {
826+
827+
var (
828+
config = &ContainerConfig{containertypes.Config{
829+
Image: "busybox",
830+
Cmd: []string{"date"},
831+
Tty: false,
832+
}, containertypes.HostConfig{
833+
Resources: containertypes.Resources{
834+
CPUShares: 1,
835+
},
836+
}, networktypes.NetworkingConfig{}}
837+
state = types.ContainerState{
838+
StartedAt: "2018-05-07T08:33:22.070211457Z",
839+
FinishedAt: "0001-01-01T00:00:00Z",
840+
}
841+
engine = NewEngine("test", 0, engOpts)
842+
apiClient = engineapimock.NewMockClient()
843+
)
844+
845+
engine.apiClient = apiClient
846+
engine.setState(stateUnhealthy)
847+
848+
apiClient.On("Info", mock.Anything).Return(mockInfo, nil)
849+
apiClient.On("ServerVersion", mock.Anything).Return(mockVersion, nil)
850+
apiClient.On("NetworkList", mock.Anything,
851+
mock.AnythingOfType("NetworkListOptions"),
852+
).Return([]types.NetworkResource{}, nil)
853+
apiClient.On("VolumeList", mock.Anything,
854+
mock.AnythingOfType("Args"),
855+
).Return(volume.VolumesListOKBody{}, nil)
856+
apiClient.On("Events", mock.Anything, mock.AnythingOfType("EventsOptions")).Return(make(chan events.Message), make(chan error))
857+
apiClient.On("ImageList", mock.Anything, mock.AnythingOfType("ImageListOptions")).Return([]types.ImageSummary{}, nil)
858+
// we need to call ContainerList twice, onece for ConnectWithClient and once for the first iteration when we kick in the refreshLoop
859+
apiClient.On("ContainerList", mock.Anything, types.ContainerListOptions{All: true, Size: false}).Return([]types.Container{}, nil).Twice()
860+
apiClient.On("NegotiateAPIVersion", mock.Anything).Return()
861+
862+
assert.NoError(t, engine.ConnectWithClient(apiClient))
863+
// Make sure engine is happy
864+
assert.True(t, engine.isConnected())
865+
assert.True(t, engine.state == stateHealthy)
866+
// Stimulate engine failure by increasing the failure count and making it unhealthy
867+
engine.failureCount = 900
868+
engine.state = stateUnhealthy
869+
go engine.refreshLoop()
870+
// At this point, the loop should be waiting very long due to high failure count
871+
assert.Len(t, engine.Containers(), 0)
872+
873+
// The below mock methods are used to verify that refresh loop resumed on the next
874+
// CheckConnectionErr and added a new container
875+
apiClient.On(
876+
"ContainerList",
877+
mock.Anything,
878+
types.ContainerListOptions{
879+
All: true,
880+
Size: false,
881+
},
882+
).Return(
883+
[]types.Container{{ID: "100"}},
884+
nil,
885+
).Once()
886+
887+
apiClient.On(
888+
"ContainerInspect",
889+
mock.Anything,
890+
"100",
891+
).Return(
892+
types.ContainerJSON{
893+
Config: &config.Config,
894+
ContainerJSONBase: &types.ContainerJSONBase{
895+
HostConfig: &config.HostConfig,
896+
State: &state,
897+
},
898+
NetworkSettings: &types.NetworkSettings{
899+
Networks: nil,
900+
},
901+
},
902+
nil,
903+
).Once()
904+
// This forces the refrehLoop to resume and not wait very long
905+
engine.CheckConnectionErr(nil)
906+
time.Sleep(1 * time.Second)
907+
assert.Len(t, engine.Containers(), 1)
908+
}

0 commit comments

Comments
 (0)