@@ -52,6 +52,7 @@ type OrcEngine struct {
5252 dependsCtrls map [string ]* dependsController
5353 rmDepCtrls map [string ]* dependsController
5454 opsChan chan orcOperation
55+ refreshAllChan chan bool
5556 stop chan struct {}
5657 clstrFailCnt int
5758}
@@ -554,6 +555,41 @@ func (engine *OrcEngine) clusterRequestSucceed() {
554555 engine .clstrFailCnt = 0
555556}
556557
558+ func (engine * OrcEngine ) refreshAllPodGroups () {
559+ engine .RLock ()
560+ if len (engine .pgCtrls ) > 0 {
561+ rInterval := RefreshInterval / 2 * 1000 / len (engine .pgCtrls )
562+ index := 0
563+ for _ , pgCtrl := range engine .pgCtrls {
564+ if atomic .LoadInt32 (& (pgCtrl .refreshable )) == 1 {
565+ interval := index * rInterval
566+ _pgCtrl := pgCtrl
567+ index ++
568+ go func () {
569+ log .Infof ("%s will be refreshed after %d seconds" , _pgCtrl , interval / 1000 )
570+ time .Sleep (time .Duration (interval ) * time .Millisecond )
571+ engine .opsChan <- orcOperRefresh {_pgCtrl , false }
572+ }()
573+ }
574+ }
575+ }
576+ if len (engine .dependsCtrls ) > 0 {
577+ rInterval := RefreshInterval / 2 * 1000 / len (engine .dependsCtrls )
578+ index := 0
579+ for _ , depCtrl := range engine .dependsCtrls {
580+ interval := index * rInterval
581+ _depCtrl := depCtrl
582+ index ++
583+ go func () {
584+ log .Infof ("%s will be refreshed after %d seconds" , _depCtrl , interval / 1000 )
585+ time .Sleep (time .Duration (RefreshInterval / 2 * 1000 + interval ) * time .Millisecond )
586+ engine .opsChan <- orcOperDependsRefresh {_depCtrl }
587+ }()
588+ }
589+ }
590+ engine .RUnlock ()
591+ }
592+
557593// This will be running inside the go routine
558594func (engine * OrcEngine ) initOperationWorker () {
559595 tick := time .Tick (time .Duration (RefreshInterval ) * time .Second )
@@ -562,39 +598,10 @@ func (engine *OrcEngine) initOperationWorker() {
562598 select {
563599 case op := <- engine .opsChan :
564600 op .Do (engine )
601+ case <- engine .refreshAllChan :
602+ engine .refreshAllPodGroups ()
565603 case <- tick :
566- engine .RLock ()
567- if len (engine .pgCtrls ) > 0 {
568- rInterval := RefreshInterval / 2 * 1000 / len (engine .pgCtrls )
569- index := 0
570- for _ , pgCtrl := range engine .pgCtrls {
571- if atomic .LoadInt32 (& (pgCtrl .refreshable )) == 1 {
572- interval := index * rInterval
573- _pgCtrl := pgCtrl
574- index ++
575- go func () {
576- log .Infof ("%s will be refreshed after %d seconds" , _pgCtrl , interval / 1000 )
577- time .Sleep (time .Duration (interval ) * time .Millisecond )
578- engine .opsChan <- orcOperRefresh {_pgCtrl , false }
579- }()
580- }
581- }
582- }
583- if len (engine .dependsCtrls ) > 0 {
584- rInterval := RefreshInterval / 2 * 1000 / len (engine .dependsCtrls )
585- index := 0
586- for _ , depCtrl := range engine .dependsCtrls {
587- interval := index * rInterval
588- _depCtrl := depCtrl
589- index ++
590- go func () {
591- log .Infof ("%s will be refreshed after %d seconds" , _depCtrl , interval / 1000 )
592- time .Sleep (time .Duration (RefreshInterval / 2 * 1000 + interval ) * time .Millisecond )
593- engine .opsChan <- orcOperDependsRefresh {_depCtrl }
594- }()
595- }
596- }
597- engine .RUnlock ()
604+ engine .refreshAllPodGroups ()
598605 case <- portsTick :
599606 RefreshPorts (engine .pgCtrls )
600607 case <- engine .stop :
@@ -687,7 +694,7 @@ func pgOpOver(store storage.Store, pgname string) {
687694// kLainPgOpingKey
688695
689696func (engine * OrcEngine ) onClusterNodeLost (nodeName string , downCount int ) {
690- log .Warnf ("Cluster node is down, [%q], %s nodes down in all, will check if need stop the engine" , nodeName , downCount )
697+ log .Warnf ("Cluster node is down, [%q], %d nodes down in all, will check if need stop the engine" , nodeName , downCount )
691698 if downCount >= maxDownNode {
692699 log .Warnf ("Too many cluster nodes stoped in a short period, need stop the engine" )
693700 engine .Stop ()
@@ -696,11 +703,10 @@ func (engine *OrcEngine) onClusterNodeLost(nodeName string, downCount int) {
696703
697704func (engine * OrcEngine ) startClusterMonitor () {
698705 restart := make (chan bool )
699- downTime := time .Now ()
700- downCount := 0
706+ downNodes := make (map [string ]time.Time )
701707 for {
702- successed := SyncEventsDataFromStorage (engine )
703- if ! successed {
708+ succeed := SyncEventsDataFromStorage (engine )
709+ if ! succeed {
704710 time .Sleep (1 * time .Hour )
705711 } else {
706712 break
@@ -718,13 +724,21 @@ func (engine *OrcEngine) startClusterMonitor() {
718724 switch event .Status {
719725 case "engine_disconnect" :
720726 now := time .Now ()
721- if downTime .Add (downNodeResetPeriod ).Before (now ) {
722- downCount = 1
723- downTime = time .Now ()
724- } else {
725- downCount += 1
727+ log .Warnf ("got engine disconnect event from %s, downTime: %v" , event .Node .Name , now )
728+ downNodes [event .Node .Name ] = now
729+ downCount := 0
730+ for _ , v := range downNodes {
731+ if v .Add (downNodeResetPeriod ).After (now ) {
732+ downCount ++
733+ }
726734 }
727735 engine .onClusterNodeLost (event .Node .Name , downCount )
736+ case "engine_connect" :
737+ log .Infof ("got engine connect event from %s" , event .Node .Name )
738+ if _ , ok := downNodes [event .Node .Name ]; ok {
739+ delete (downNodes , event .Node .Name )
740+ engine .refreshAllChan <- true
741+ }
728742 }
729743 } else {
730744 HandleDockerEvent (engine , & event )
0 commit comments