Skip to content

Commit 175664f

Browse files
committed
Fix: duplicate engine_disconnect event cause engion to stop
1 parent a415a51 commit 175664f

File tree

1 file changed

+56
-42
lines changed

1 file changed

+56
-42
lines changed

engine/engine.go

Lines changed: 56 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type OrcEngine struct {
5252
dependsCtrls map[string]*dependsController
5353
rmDepCtrls map[string]*dependsController
5454
opsChan chan orcOperation
55+
refreshAllChan chan bool
5556
stop chan struct{}
5657
clstrFailCnt int
5758
}
@@ -554,6 +555,41 @@ func (engine *OrcEngine) clusterRequestSucceed() {
554555
engine.clstrFailCnt = 0
555556
}
556557

558+
func (engine *OrcEngine) refreshAllPodGroups() {
559+
engine.RLock()
560+
if len(engine.pgCtrls) > 0 {
561+
rInterval := RefreshInterval / 2 * 1000 / len(engine.pgCtrls)
562+
index := 0
563+
for _, pgCtrl := range engine.pgCtrls {
564+
if atomic.LoadInt32(&(pgCtrl.refreshable)) == 1 {
565+
interval := index * rInterval
566+
_pgCtrl := pgCtrl
567+
index++
568+
go func() {
569+
log.Infof("%s will be refreshed after %d seconds", _pgCtrl, interval/1000)
570+
time.Sleep(time.Duration(interval) * time.Millisecond)
571+
engine.opsChan <- orcOperRefresh{_pgCtrl, false}
572+
}()
573+
}
574+
}
575+
}
576+
if len(engine.dependsCtrls) > 0 {
577+
rInterval := RefreshInterval / 2 * 1000 / len(engine.dependsCtrls)
578+
index := 0
579+
for _, depCtrl := range engine.dependsCtrls {
580+
interval := index * rInterval
581+
_depCtrl := depCtrl
582+
index++
583+
go func() {
584+
log.Infof("%s will be refreshed after %d seconds", _depCtrl, interval/1000)
585+
time.Sleep(time.Duration(RefreshInterval/2*1000+interval) * time.Millisecond)
586+
engine.opsChan <- orcOperDependsRefresh{_depCtrl}
587+
}()
588+
}
589+
}
590+
engine.RUnlock()
591+
}
592+
557593
// This will be running inside the go routine
558594
func (engine *OrcEngine) initOperationWorker() {
559595
tick := time.Tick(time.Duration(RefreshInterval) * time.Second)
@@ -562,39 +598,10 @@ func (engine *OrcEngine) initOperationWorker() {
562598
select {
563599
case op := <-engine.opsChan:
564600
op.Do(engine)
601+
case <-engine.refreshAllChan:
602+
engine.refreshAllPodGroups()
565603
case <-tick:
566-
engine.RLock()
567-
if len(engine.pgCtrls) > 0 {
568-
rInterval := RefreshInterval / 2 * 1000 / len(engine.pgCtrls)
569-
index := 0
570-
for _, pgCtrl := range engine.pgCtrls {
571-
if atomic.LoadInt32(&(pgCtrl.refreshable)) == 1 {
572-
interval := index * rInterval
573-
_pgCtrl := pgCtrl
574-
index++
575-
go func() {
576-
log.Infof("%s will be refreshed after %d seconds", _pgCtrl, interval/1000)
577-
time.Sleep(time.Duration(interval) * time.Millisecond)
578-
engine.opsChan <- orcOperRefresh{_pgCtrl, false}
579-
}()
580-
}
581-
}
582-
}
583-
if len(engine.dependsCtrls) > 0 {
584-
rInterval := RefreshInterval / 2 * 1000 / len(engine.dependsCtrls)
585-
index := 0
586-
for _, depCtrl := range engine.dependsCtrls {
587-
interval := index * rInterval
588-
_depCtrl := depCtrl
589-
index++
590-
go func() {
591-
log.Infof("%s will be refreshed after %d seconds", _depCtrl, interval/1000)
592-
time.Sleep(time.Duration(RefreshInterval/2*1000+interval) * time.Millisecond)
593-
engine.opsChan <- orcOperDependsRefresh{_depCtrl}
594-
}()
595-
}
596-
}
597-
engine.RUnlock()
604+
engine.refreshAllPodGroups()
598605
case <-portsTick:
599606
RefreshPorts(engine.pgCtrls)
600607
case <-engine.stop:
@@ -687,7 +694,7 @@ func pgOpOver(store storage.Store, pgname string) {
687694
// kLainPgOpingKey
688695

689696
func (engine *OrcEngine) onClusterNodeLost(nodeName string, downCount int) {
690-
log.Warnf("Cluster node is down, [%q], %s nodes down in all, will check if need stop the engine", nodeName, downCount)
697+
log.Warnf("Cluster node is down, [%q], %d nodes down in all, will check if need stop the engine", nodeName, downCount)
691698
if downCount >= maxDownNode {
692699
log.Warnf("Too many cluster nodes stoped in a short period, need stop the engine")
693700
engine.Stop()
@@ -696,11 +703,10 @@ func (engine *OrcEngine) onClusterNodeLost(nodeName string, downCount int) {
696703

697704
func (engine *OrcEngine) startClusterMonitor() {
698705
restart := make(chan bool)
699-
downTime := time.Now()
700-
downCount := 0
706+
downNodes := make(map[string]time.Time)
701707
for {
702-
successed := SyncEventsDataFromStorage(engine)
703-
if !successed {
708+
succeed := SyncEventsDataFromStorage(engine)
709+
if !succeed {
704710
time.Sleep(1 * time.Hour)
705711
} else {
706712
break
@@ -718,13 +724,21 @@ func (engine *OrcEngine) startClusterMonitor() {
718724
switch event.Status {
719725
case "engine_disconnect":
720726
now := time.Now()
721-
if downTime.Add(downNodeResetPeriod).Before(now) {
722-
downCount = 1
723-
downTime = time.Now()
724-
} else {
725-
downCount += 1
727+
log.Warnf("got engine disconnect event from %s, downTime: %v", event.Node.Name, now)
728+
downNodes[event.Node.Name] = now
729+
downCount := 0
730+
for _, v := range downNodes {
731+
if v.Add(downNodeResetPeriod).After(now) {
732+
downCount++
733+
}
726734
}
727735
engine.onClusterNodeLost(event.Node.Name, downCount)
736+
case "engine_connect":
737+
log.Infof("got engine connect event from %s", event.Node.Name)
738+
if _, ok := downNodes[event.Node.Name]; ok {
739+
delete(downNodes, event.Node.Name)
740+
engine.refreshAllChan <- true
741+
}
728742
}
729743
} else {
730744
HandleDockerEvent(engine, &event)

0 commit comments

Comments
 (0)