From 43987bfac474611250fcb7ff54689519d6dbeb2a Mon Sep 17 00:00:00 2001 From: goodliu Date: Sun, 11 May 2025 11:48:48 +0800 Subject: [PATCH 1/3] fix: protect access to lastFailureAt in circuit breaker by atomic operations to avoid race conditions And remove unnecessary nil checks for the receiver. it is the caller's responsibility to ensure that circuit breaker is not nil. --- circuit_breaker.go | 14 +++----------- client.go | 10 +++++++--- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/circuit_breaker.go b/circuit_breaker.go index 8b25251b..b6b42122 100644 --- a/circuit_breaker.go +++ b/circuit_breaker.go @@ -31,7 +31,7 @@ type CircuitBreaker struct { state atomic.Value // circuitBreakerState failureCount atomic.Uint32 successCount atomic.Uint32 - lastFailureAt time.Time + lastFailureAt atomic.Value // time.Time } // NewCircuitBreaker method creates a new [CircuitBreaker] with default settings. @@ -117,10 +117,6 @@ func (cb *CircuitBreaker) getState() circuitBreakerState { } func (cb *CircuitBreaker) allow() error { - if cb == nil { - return nil - } - if cb.getState() == circuitBreakerStateOpen { return ErrCircuitBreakerOpen } @@ -129,10 +125,6 @@ func (cb *CircuitBreaker) allow() error { } func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { - if cb == nil { - return - } - failed := false for _, policy := range cb.policies { if policy(resp) { @@ -142,7 +134,7 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { } if failed { - if cb.failureCount.Load() > 0 && time.Since(cb.lastFailureAt) > cb.timeout { + if cb.failureCount.Load() > 0 && time.Since(cb.lastFailureAt.Load().(time.Time)) > cb.timeout { cb.failureCount.Store(0) } @@ -152,7 +144,7 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { if failCount >= cb.failureThreshold { cb.open() } else { - cb.lastFailureAt = time.Now() + cb.lastFailureAt.Store(time.Now()) } case circuitBreakerStateHalfOpen: cb.open() diff --git a/client.go b/client.go index ec6141f8..656252ef 100644 --- a/client.go +++ b/client.go @@ -2240,8 +2240,10 @@ func (c *Client) executeRequestMiddlewares(req *Request) (err error) { // Executes method executes the given `Request` object and returns // response or error. func (c *Client) execute(req *Request) (*Response, error) { - if err := c.circuitBreaker.allow(); err != nil { - return nil, err + if c.circuitBreaker != nil { + if err := c.circuitBreaker.allow(); err != nil { + return nil, err + } } if err := c.executeRequestMiddlewares(req); err != nil { @@ -2268,7 +2270,9 @@ func (c *Client) execute(req *Request) (*Response, error) { } } if resp != nil { - c.circuitBreaker.applyPolicies(resp) + if c.circuitBreaker != nil { + c.circuitBreaker.applyPolicies(resp) + } response.Body = resp.Body if err = response.wrapContentDecompresser(); err != nil { From 5e0efd4391bf7911d47cecbe817ef2e56265b4ac Mon Sep 17 00:00:00 2001 From: goodliu Date: Sun, 11 May 2025 15:53:08 +0800 Subject: [PATCH 2/3] fix: replace lastFailureAt with openStartAt in CircuitBreaker to improve state management --- circuit_breaker.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/circuit_breaker.go b/circuit_breaker.go index b6b42122..235f0edd 100644 --- a/circuit_breaker.go +++ b/circuit_breaker.go @@ -31,7 +31,7 @@ type CircuitBreaker struct { state atomic.Value // circuitBreakerState failureCount atomic.Uint32 successCount atomic.Uint32 - lastFailureAt atomic.Value // time.Time + openStartAt atomic.Value // time.Time } // NewCircuitBreaker method creates a new [CircuitBreaker] with default settings. @@ -134,7 +134,7 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { } if failed { - if cb.failureCount.Load() > 0 && time.Since(cb.lastFailureAt.Load().(time.Time)) > cb.timeout { + if cb.failureCount.Load() > 0 && time.Since(cb.openStartAt.Load().(time.Time)) > cb.timeout { cb.failureCount.Store(0) } @@ -144,10 +144,14 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { if failCount >= cb.failureThreshold { cb.open() } else { - cb.lastFailureAt.Store(time.Now()) + cb.openStartAt.Store(time.Now()) } case circuitBreakerStateHalfOpen: cb.open() + case circuitBreakerStateOpen: + if time.Since(cb.openStartAt.Load().(time.Time)) >= cb.timeout { + cb.changeState(circuitBreakerStateHalfOpen) + } } } else { switch cb.getState() { @@ -158,16 +162,17 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { if successCount >= cb.successThreshold { cb.changeState(circuitBreakerStateClosed) } + case circuitBreakerStateOpen: + if time.Since(cb.openStartAt.Load().(time.Time)) >= cb.timeout { + cb.changeState(circuitBreakerStateHalfOpen) + } } } } func (cb *CircuitBreaker) open() { cb.changeState(circuitBreakerStateOpen) - go func() { - time.Sleep(cb.timeout) - cb.changeState(circuitBreakerStateHalfOpen) - }() + cb.openStartAt.Store(time.Now()) } func (cb *CircuitBreaker) changeState(state circuitBreakerState) { From 90c5a7db1c549b6ef79d7893c2ac0872a736f9e6 Mon Sep 17 00:00:00 2001 From: goodliu Date: Sun, 11 May 2025 15:57:57 +0800 Subject: [PATCH 3/3] Revert "fix: replace lastFailureAt with openStartAt in CircuitBreaker to improve state management" This reverts commit 5e0efd4391bf7911d47cecbe817ef2e56265b4ac. --- circuit_breaker.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/circuit_breaker.go b/circuit_breaker.go index 235f0edd..b6b42122 100644 --- a/circuit_breaker.go +++ b/circuit_breaker.go @@ -31,7 +31,7 @@ type CircuitBreaker struct { state atomic.Value // circuitBreakerState failureCount atomic.Uint32 successCount atomic.Uint32 - openStartAt atomic.Value // time.Time + lastFailureAt atomic.Value // time.Time } // NewCircuitBreaker method creates a new [CircuitBreaker] with default settings. @@ -134,7 +134,7 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { } if failed { - if cb.failureCount.Load() > 0 && time.Since(cb.openStartAt.Load().(time.Time)) > cb.timeout { + if cb.failureCount.Load() > 0 && time.Since(cb.lastFailureAt.Load().(time.Time)) > cb.timeout { cb.failureCount.Store(0) } @@ -144,14 +144,10 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { if failCount >= cb.failureThreshold { cb.open() } else { - cb.openStartAt.Store(time.Now()) + cb.lastFailureAt.Store(time.Now()) } case circuitBreakerStateHalfOpen: cb.open() - case circuitBreakerStateOpen: - if time.Since(cb.openStartAt.Load().(time.Time)) >= cb.timeout { - cb.changeState(circuitBreakerStateHalfOpen) - } } } else { switch cb.getState() { @@ -162,17 +158,16 @@ func (cb *CircuitBreaker) applyPolicies(resp *http.Response) { if successCount >= cb.successThreshold { cb.changeState(circuitBreakerStateClosed) } - case circuitBreakerStateOpen: - if time.Since(cb.openStartAt.Load().(time.Time)) >= cb.timeout { - cb.changeState(circuitBreakerStateHalfOpen) - } } } } func (cb *CircuitBreaker) open() { cb.changeState(circuitBreakerStateOpen) - cb.openStartAt.Store(time.Now()) + go func() { + time.Sleep(cb.timeout) + cb.changeState(circuitBreakerStateHalfOpen) + }() } func (cb *CircuitBreaker) changeState(state circuitBreakerState) {