Skip to content

Commit 855fd64

Browse files
authored
Merge pull request #19 from supermeng/master
deal with swarm connection error
2 parents 7e0be60 + f206dbe commit 855fd64

File tree

6 files changed

+74
-14
lines changed

6 files changed

+74
-14
lines changed

engine/engine.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ var (
3131
ErrNotifyNotExists = errors.New("Notify uri not existed")
3232
)
3333

34+
const (
35+
ClusterFailedThreadSold = 20
36+
)
37+
3438
type OrcEngine struct {
3539
sync.RWMutex
3640

@@ -43,6 +47,7 @@ type OrcEngine struct {
4347
rmDepCtrls map[string]*dependsController
4448
opsChan chan orcOperation
4549
stop chan struct{}
50+
clstrFailCnt int
4651
}
4752

4853
const (
@@ -334,6 +339,18 @@ func (engine *OrcEngine) initPodGroupCtrl(spec PodGroupSpec, states []PodPrevSta
334339
return pgCtrl
335340
}
336341

342+
func (engine *OrcEngine) clusterRequestFailed() {
343+
engine.clstrFailCnt++
344+
if engine.clstrFailCnt > ClusterFailedThreadSold && engine.clstrFailCnt%ClusterFailedThreadSold == 0 {
345+
ntfController.Send(NewNotifySpec("Cluster", "Cluster-Manager",
346+
1, time.Now(), NotifyClusterUnHealthy))
347+
}
348+
}
349+
350+
func (engine *OrcEngine) clusterRequestSucceed() {
351+
engine.clstrFailCnt = 0
352+
}
353+
337354
// This will be running inside the go routine
338355
func (engine *OrcEngine) initOperationWorker() {
339356
tick := time.Tick(time.Duration(RefreshInterval) * time.Second)
@@ -493,9 +510,11 @@ func (engine *OrcEngine) startClusterMonitor() {
493510
downCount := 0
494511
eventMonitorId := engine.cluster.MonitorEvents("", func(event adoc.Event, err error) {
495512
if err != nil {
496-
log.Warnf("Error during the cluster event monitor, will try to restart the monitor, %s", err)
513+
// log.Warnf("Error during the cluster event monitor, will try to restart the monitor, %s", err)
514+
engine.clusterRequestFailed()
497515
restart <- true
498516
} else {
517+
engine.clusterRequestSucceed()
499518
log.Debugf("Cluster event: %v", event)
500519
if strings.HasPrefix(event.From, "swarm") {
501520
switch event.Status {
@@ -559,6 +578,7 @@ func New(cluster cluster.Cluster, store storage.Store) (*OrcEngine, error) {
559578
rmDepCtrls: make(map[string]*dependsController),
560579
opsChan: make(chan orcOperation, 500),
561580
stop: nil,
581+
clstrFailCnt: 0,
562582
}
563583

564584
eagleView := NewRuntimeEagleView()

engine/notify.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ var (
2222
NotifyLetPodGo = "LAIN found pod restart too many times in a short period, will let it go"
2323
NotifyPodIPLost = "LAIN found pod lost IP, please inform the SA team"
2424
NotifyPodUnHealthy = "LAIN found pod Unhealthy, please check your service"
25+
26+
NotifyClusterUnHealthy = "LAIN found Cluster Manager Unhealthy, please check your cluster"
2527
)
2628

2729
type notifyController struct {

engine/pod.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/laincloud/deployd/cluster"
11+
"github.com/laincloud/deployd/utils/util"
1112
"github.com/mijia/adoc"
1213
"github.com/mijia/go-generics"
1314
"github.com/mijia/sweb/log"
@@ -69,14 +70,18 @@ func (pc *podController) Deploy(cluster cluster.Cluster) {
6970
id, err := pc.createContainer(cluster, filters, i)
7071
if err != nil {
7172
log.Warnf("%s Cannot create container, error=%q, spec=%+v", pc, err, cSpec)
72-
pc.pod.State = RunStateFail
73+
if !util.IsConnectionError(err) {
74+
pc.pod.State = RunStateFail
75+
}
7376
pc.pod.LastError = fmt.Sprintf("Cannot create container, %s", err)
7477
return
7578
}
7679

7780
if err := cluster.StartContainer(id); err != nil {
7881
log.Warnf("%s Cannot start container %s, %s", pc, id, err)
79-
pc.pod.State = RunStateFail
82+
if !util.IsConnectionError(err) {
83+
pc.pod.State = RunStateFail
84+
}
8085
pc.pod.LastError = fmt.Sprintf("Cannot start container, %s", err)
8186
}
8287

@@ -177,7 +182,9 @@ func (pc *podController) Stop(cluster cluster.Cluster) {
177182
for i, container := range pc.pod.Containers {
178183
if err := cluster.StopContainer(container.Id, pc.spec.GetKillTimeout()); err != nil {
179184
log.Warnf("%s Cannot stop the container %s, %s", pc, container.Id, err)
180-
pc.pod.State = RunStateFail
185+
if !util.IsConnectionError(err) {
186+
pc.pod.State = RunStateFail
187+
}
181188
pc.pod.LastError = fmt.Sprintf("Cannot stop container, %s", err)
182189
} else {
183190
pc.refreshContainer(cluster, i)
@@ -200,7 +207,9 @@ func (pc *podController) Start(cluster cluster.Cluster) {
200207
for i, container := range pc.pod.Containers {
201208
if err := cluster.StartContainer(container.Id); err != nil {
202209
log.Warnf("%s Cannot start the container %s, %s", pc, container.Id, err)
203-
pc.pod.State = RunStateFail
210+
if !util.IsConnectionError(err) {
211+
pc.pod.State = RunStateFail
212+
}
204213
pc.pod.LastError = fmt.Sprintf("Cannot start container, %s", err)
205214
} else {
206215
pc.refreshContainer(cluster, i)
@@ -273,7 +282,9 @@ func (pc *podController) refreshContainer(kluster cluster.Cluster, index int) {
273282
pc.pod.LastError = fmt.Sprintf("Missing container %q, %s", id, err)
274283
} else {
275284
log.Warnf("%s Failed to inspect container %s, %s", pc, id, err)
276-
pc.pod.State = RunStateFail
285+
if !util.IsConnectionError(err) {
286+
pc.pod.State = RunStateFail
287+
}
277288
pc.pod.LastError = fmt.Sprintf("Cannot inspect the container, %s", err)
278289
}
279290
} else {
@@ -282,7 +293,6 @@ func (pc *podController) refreshContainer(kluster cluster.Cluster, index int) {
282293
network = pc.spec.Namespace
283294
}
284295
prevIP, nowIP := pc.spec.PrevState.IPs[index], info.NetworkSettings.Networks[network].IPAddress
285-
286296
// NOTE: if the container's ip is not equal to prev ip, try to correct it; if failed, accpet new ip
287297
if prevIP != "" && prevIP != nowIP {
288298
log.Warnf("%s find the IP changed, prev is %s, but now is %s, try to correct it", pc, prevIP, nowIP)

engine/podgroup_ops.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ func (op pgOperRefreshInstance) Do(pgCtrl *podGroupController, c cluster.Cluster
180180
}
181181
} else {
182182
log.Warnf("PodGroupCtrl %s, we found pod missing, just redeploy it", op.spec)
183-
ntfController.Send(NewNotifySpec(podCtrl.spec.Namespace, podCtrl.spec.Name,
184-
op.instanceNo, container.Runtime.State.FinishedAt, NotifyPodMissing))
183+
//pod missing usually happended when agent was down, so no need to notify app users
184+
// ntfController.Send(NewNotifySpec(podCtrl.spec.Namespace, podCtrl.spec.Name,
185+
// op.instanceNo, container.Runtime.State.FinishedAt, NotifyPodMissing))
185186
newPodSpec := podCtrl.spec.Clone()
186187
prevNodeName := newPodSpec.PrevState.NodeName
187188
if newPodSpec.IsHardStateful() {

utils/util/util.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package util
22

33
import (
44
"errors"
5+
"net"
6+
"net/url"
57
"strconv"
68

79
"github.com/laincloud/deployd/utils/regex"
@@ -22,3 +24,20 @@ func ParseNameInstanceNo(containerName string) (string, int, error) {
2224
return g.Group(1), instance, nil
2325
}
2426
}
27+
28+
func IsConnectionError(err error) bool {
29+
p := regex.MustCompile(`getsockopt: connection refused`)
30+
g := p.Match(err.Error())
31+
if g != nil {
32+
return true
33+
}
34+
switch err := err.(type) {
35+
case net.Error:
36+
return err.Timeout()
37+
case *url.Error:
38+
if err, ok := err.Err.(net.Error); ok {
39+
return err.Timeout()
40+
}
41+
}
42+
return false
43+
}

utils/util/util_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package util
22

33
import (
4+
"errors"
45
"fmt"
56
"reflect"
67
"testing"
78
"time"
9+
10+
"github.com/stretchr/testify/assert"
811
)
912

1013
func Test_ParseNameInstanceNo(t *testing.T) {
1114
containerName := "webrouter.worker.worker.v0-i1-d0"
12-
if name, incetance, err := ParseNameInstanceNo(containerName); err == nil {
13-
fmt.Printf("name:%v,incetance:%v\n", name, incetance)
14-
} else {
15-
fmt.Println("err:%v\n", err)
16-
}
15+
16+
name, incetance, err := ParseNameInstanceNo(containerName)
17+
assert.Equal(t, nil, err)
18+
assert.Equal(t, "webrouter.worker.worker", name)
19+
assert.Equal(t, 1, incetance)
20+
}
21+
22+
func Test_IsConnectionError(t *testing.T) {
23+
err := errors.New("dial tcp 192.168.77.21:2376: getsockopt: connection refused")
24+
assert.Equal(t, true, IsConnectionError(err))
1725
}
1826

1927
func Test_deepEqual(t *testing.T) {

0 commit comments

Comments
 (0)