55 "fmt"
66 "strings"
77 "sync"
8+ "sync/atomic"
89 "time"
910
1011 "github.com/laincloud/deployd/cluster"
@@ -39,7 +40,7 @@ type podGroupController struct {
3940 podCtrls []* podController
4041 opsChan chan pgOperation
4142
42- refreshable bool
43+ refreshable int32
4344
4445 lastPodSpecKey string
4546 storedKey string
@@ -50,33 +51,32 @@ func (pgCtrl *podGroupController) String() string {
5051 return fmt .Sprintf ("PodGroupCtrl %s" , pgCtrl .spec )
5152}
5253
53- func (pgCtrl * podGroupController ) CanOperate (pgops PGOpState ) PGOpState {
54- pgCtrl .Lock ()
55- defer pgCtrl .Unlock ()
56- if pgCtrl .opState == PGOpStateIdle {
57- pgCtrl .opState = pgops
58- return PGOpStateIdle
54+ func (pgCtrl * podGroupController ) CanOperate (pgops PGOpState ) bool {
55+ if atomic .CompareAndSwapInt32 ((* int32 )(& pgCtrl .opState ), PGOpStateIdle , int32 (pgops )) {
56+ pgCtrl .DisableRefresh ()
57+ return true
58+ } else if atomic .LoadInt32 ((* int32 )(& pgCtrl .opState )) == PGOpStateUpgrading &&
59+ pgops == PGOpStateUpgrading {
60+ // when pg is in upgreading state so flush old opchans and start new op
61+ pgCtrl .DisableRefresh ()
62+ return true
5963 }
60- return pgCtrl . opState
64+ return false
6165}
6266
6367func (pgCtrl * podGroupController ) DisableRefresh () {
64- pgCtrl .Lock ()
65- defer pgCtrl .Unlock ()
66- pgCtrl .refreshable = false
68+ atomic .StoreInt32 (& (pgCtrl .refreshable ), int32 (0 ))
6769}
6870
6971func (pgCtrl * podGroupController ) EnableRefresh () {
70- pgCtrl .Lock ()
71- defer pgCtrl .Unlock ()
72- pgCtrl .refreshable = true
72+ atomic .StoreInt32 (& (pgCtrl .refreshable ), int32 (1 ))
7373}
7474
7575// called by signle goroutine
7676func (pgCtrl * podGroupController ) OperateOver () {
77- pgCtrl .Lock ( )
78- defer pgCtrl . Unlock ( )
79- pgCtrl .opState = PGOpStateIdle
77+ pgCtrl .emitOperationEvent ( OperationOver )
78+ atomic . StoreInt32 (( * int32 )( & pgCtrl . opState ), PGOpStateIdle )
79+ pgCtrl .EnableRefresh ()
8080}
8181
8282func (pgCtrl * podGroupController ) Inspect () PodGroupWithSpec {
@@ -112,6 +112,7 @@ func (pgCtrl *podGroupController) IsPending() bool {
112112}
113113
114114func (pgCtrl * podGroupController ) Deploy () {
115+ pgCtrl .flushAllOps ()
115116 pgCtrl .emitOperationEvent (OperationStart )
116117 defer func () {
117118 pgCtrl .opsChan <- pgOperOver {}
@@ -139,6 +140,7 @@ func (pgCtrl *podGroupController) Deploy() {
139140}
140141
141142func (pgCtrl * podGroupController ) RescheduleInstance (numInstances int , restartPolicy ... RestartPolicy ) {
143+ pgCtrl .flushAllOps ()
142144 pgCtrl .emitOperationEvent (OperationStart )
143145 defer func () {
144146 pgCtrl .opsChan <- pgOperOver {}
@@ -190,17 +192,14 @@ func (pgCtrl *podGroupController) RescheduleInstance(numInstances int, restartPo
190192}
191193
192194func (pgCtrl * podGroupController ) RescheduleSpec (podSpec PodSpec ) {
195+ pgCtrl .flushAllOps ()
193196 pgCtrl .emitOperationEvent (OperationStart )
194197 defer func () {
195198 pgCtrl .opsChan <- pgOperOver {}
196199 }()
197200 pgCtrl .RLock ()
198201 spec := pgCtrl .spec .Clone ()
199202 pgCtrl .RUnlock ()
200- // pgCtrl.group.Pods[0].NodeName()
201- if spec .Pod .Equals (podSpec ) {
202- return
203- }
204203 pgCtrl .emptyError ()
205204 if ok := pgCtrl .updatePodPorts (podSpec ); ! ok {
206205 return
@@ -230,6 +229,7 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) {
230229}
231230
232231func (pgCtrl * podGroupController ) RescheduleDrift (fromNode , toNode string , instanceNo int , force bool ) {
232+ pgCtrl .flushAllOps ()
233233 defer func () {
234234 pgCtrl .opsChan <- pgOperOver {}
235235 }()
@@ -254,6 +254,7 @@ func (pgCtrl *podGroupController) RescheduleDrift(fromNode, toNode string, insta
254254}
255255
256256func (pgCtrl * podGroupController ) Remove () {
257+ pgCtrl .flushAllOps ()
257258 pgCtrl .emitOperationEvent (OperationStart )
258259 defer func () {
259260 pgCtrl .opsChan <- pgOperOver {}
@@ -273,6 +274,7 @@ func (pgCtrl *podGroupController) Remove() {
273274}
274275
275276func (pgCtrl * podGroupController ) ChangeState (op string , instance int ) {
277+ pgCtrl .flushAllOps ()
276278 pgCtrl .emitOperationEvent (OperationStart )
277279 defer func () {
278280 pgCtrl .opsChan <- pgOperOver {}
@@ -295,6 +297,10 @@ func (pgCtrl *podGroupController) Refresh(force bool) {
295297 if pgCtrl .IsRemoved () || pgCtrl .IsPending () {
296298 return
297299 }
300+ pgCtrl .DisableRefresh ()
301+ defer func () {
302+ pgCtrl .opsChan <- pgOperOver {}
303+ }()
298304 pgCtrl .RLock ()
299305 spec := pgCtrl .spec .Clone ()
300306 pgCtrl .RUnlock ()
@@ -328,11 +334,22 @@ func (pgCtrl *podGroupController) Activate(c cluster.Cluster, store storage.Stor
328334 }()
329335}
330336
337+ func (pgCtrl * podGroupController ) LastSpec () * PodGroupSpec {
338+ log .Infof ("Fetch LastPodSpec !" )
339+ var lastSpec PodGroupSpec
340+ if err := pgCtrl .engine .store .Get (pgCtrl .lastPodSpecKey , & lastSpec ); err != nil {
341+ log .Infof ("Fetch LastPodSpec with err:%v" , err )
342+ return nil
343+ }
344+ log .Infof ("Fetch LastPodSpec :%v" , lastSpec )
345+ return & lastSpec
346+ }
347+
331348/*
332349 * clean all ops in chan synchronously
333350 *
334351 */
335- func (pgCtrl * podGroupController ) FlushAllOps () {
352+ func (pgCtrl * podGroupController ) flushAllOps () {
336353 for {
337354 if len (pgCtrl .opsChan ) == 0 {
338355 return
@@ -345,17 +362,6 @@ func (pgCtrl *podGroupController) FlushAllOps() {
345362 }
346363}
347364
348- func (pgCtrl * podGroupController ) LastSpec () * PodGroupSpec {
349- log .Infof ("Fetch LastPodSpec !" )
350- var lastSpec PodGroupSpec
351- if err := pgCtrl .engine .store .Get (pgCtrl .lastPodSpecKey , & lastSpec ); err != nil {
352- log .Infof ("Fetch LastPodSpec with err:%v" , err )
353- return nil
354- }
355- log .Infof ("Fetch LastPodSpec :%v" , lastSpec )
356- return & lastSpec
357- }
358-
359365/*
360366 * To clean corrupted containers which do not used by cluster app any more
361367 * Should be called just after refrehsed podgroups or clean will works terrible
@@ -492,9 +498,6 @@ func (pgCtrl *podGroupController) checkPodPorts() bool {
492498
493499func (pgCtrl * podGroupController ) updatePodPorts (podSpec PodSpec ) bool {
494500 spec := pgCtrl .spec
495- if spec .Pod .Equals (podSpec ) {
496- return true
497- }
498501 var oldsps , sps StreamPorts
499502 if err := json .Unmarshal ([]byte (spec .Pod .Annotation ), & oldsps ); err != nil {
500503 log .Errorf ("annotation unmarshal error:%v\n " , err )
@@ -573,7 +576,7 @@ func (pgCtrl *podGroupController) rollBack() bool {
573576 // 1. disable refresh(so no others can produce operation) and flush ops chan
574577 log .Infof ("Start Rollback!" )
575578 pgCtrl .DisableRefresh ()
576- pgCtrl .FlushAllOps ()
579+ pgCtrl .flushAllOps ()
577580 // 2. rollback podgroup podspec info
578581 pgCtrl .Lock ()
579582 spec := pgCtrl .spec .Clone ()
@@ -692,7 +695,7 @@ func newPodGroupController(spec PodGroupSpec, states []PodPrevState, pg PodGroup
692695 podCtrls : podCtrls ,
693696 opsChan : make (chan pgOperation , 500 ),
694697
695- refreshable : true ,
698+ refreshable : 1 ,
696699
697700 lastPodSpecKey : strings .Join ([]string {kLainDeploydRootKey , kLainLastPodSpecKey , spec .Namespace , spec .Name }, "/" ),
698701 storedKey : strings .Join ([]string {kLainDeploydRootKey , kLainPodGroupKey , spec .Namespace , spec .Name }, "/" ),
0 commit comments