@@ -43,18 +43,43 @@ import (
4343type GreetFun func (ctx context.Context , req * greet.GreetRequest , opts ... client.CallOption ) (* greet.GreetResponse , error )
4444
4545type stateChangeTestListener struct {
46+ openCh chan struct {}
47+ halfOpenCh chan struct {}
48+ closedCh chan struct {}
4649}
4750
4851func (s * stateChangeTestListener ) OnTransformToClosed (prev circuitbreaker.State , rule circuitbreaker.Rule ) {
4952 logger .Infof ("rule.steategy: %+v, From %s to Closed, time: %d\n " , rule .Strategy , prev .String (), util .CurrentTimeMillis ())
53+ s .notify (s .closedCh )
5054}
5155
5256func (s * stateChangeTestListener ) OnTransformToOpen (prev circuitbreaker.State , rule circuitbreaker.Rule , snapshot interface {}) {
5357 logger .Infof ("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n " , rule .Strategy , prev .String (), snapshot , util .CurrentTimeMillis ())
58+ s .notify (s .openCh )
5459}
5560
5661func (s * stateChangeTestListener ) OnTransformToHalfOpen (prev circuitbreaker.State , rule circuitbreaker.Rule ) {
5762 logger .Infof ("rule.steategy: %+v, From %s to Half-Open, time: %d\n " , rule .Strategy , prev .String (), util .CurrentTimeMillis ())
63+ s .notify (s .halfOpenCh )
64+ }
65+
66+ func (s * stateChangeTestListener ) notify (ch chan struct {}) {
67+ if ch == nil {
68+ return
69+ }
70+ select {
71+ case ch <- struct {}{}:
72+ default :
73+ }
74+ }
75+
76+ func waitForState (ch <- chan struct {}, timeout time.Duration ) bool {
77+ select {
78+ case <- ch :
79+ return true
80+ case <- time .After (timeout ):
81+ return false
82+ }
5883}
5984
6085func main () {
@@ -69,8 +94,13 @@ func main() {
6994 if err != nil {
7095 panic (err )
7196 }
97+ listener := & stateChangeTestListener {
98+ openCh : make (chan struct {}, 1 ),
99+ halfOpenCh : make (chan struct {}, 1 ),
100+ closedCh : make (chan struct {}, 1 ),
101+ }
72102 // Register a state change listener so that we could observe the state change of the internal circuit breaker.
73- circuitbreaker .RegisterStateChangeListeners (& stateChangeTestListener {} )
103+ circuitbreaker .RegisterStateChangeListeners (listener )
74104 _ , err = circuitbreaker .LoadRules ([]* circuitbreaker.Rule {
75105 // Statistic time span=1s, recoveryTimeout=1s, maxErrorRatio=40%
76106 {
@@ -86,6 +116,7 @@ func main() {
86116 if err != nil {
87117 panic (err )
88118 }
119+ retryTimeout := 2 * time .Second
89120
90121 _ , err = isolation .LoadRules ([]* isolation.Rule {
91122 {
@@ -106,10 +137,19 @@ func main() {
106137
107138 logger .Info ("call svc.GreetWithChanceOfError triggers circuit breaker open" )
108139 CallGreetFunConcurrently (svc .GreetWithChanceOfError , "error" , 1 , 300 )
109- logger .Info ("wait circuit breaker HalfOpen" )
110- time .Sleep (3 * time .Second )
140+ if ! waitForState (listener .openCh , 5 * time .Second ) {
141+ logger .Warn ("wait circuit breaker Open timeout" )
142+ }
143+ logger .Info ("wait circuit breaker HalfOpen window" )
144+ timer := time .NewTimer (retryTimeout + 200 * time .Millisecond )
145+ <- timer .C
111146 CallGreetFunConcurrently (svc .GreetWithChanceOfError , "hello world" , 1 , 300 )
112- time .Sleep (10 * time .Second )
147+ if ! waitForState (listener .halfOpenCh , 5 * time .Second ) {
148+ logger .Warn ("wait circuit breaker HalfOpen timeout" )
149+ }
150+ if ! waitForState (listener .closedCh , 5 * time .Second ) {
151+ logger .Warn ("wait circuit breaker Closed timeout" )
152+ }
113153}
114154
115155func CallGreetFunConcurrently (f GreetFun , req string , numberOfConcurrently , frequency int ) (pass int64 , block int64 ) {
0 commit comments