Skip to content

Commit ecfdb18

Browse files
committed
configurable circuit breaker
1 parent 32ddb96 commit ecfdb18

File tree

6 files changed

+176
-22
lines changed

6 files changed

+176
-22
lines changed

hitless/circuit_breaker.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,16 @@ type CircuitBreaker struct {
5656

5757
// newCircuitBreaker creates a new circuit breaker for an endpoint
5858
func newCircuitBreaker(endpoint string, config *Config) *CircuitBreaker {
59-
// Use sensible defaults if not configured
60-
failureThreshold := 10
61-
resetTimeout := 500 * time.Millisecond
62-
maxRequests := 10
63-
64-
// These could be added to Config in the future without breaking API
65-
// For now, use internal defaults that work well
59+
// Use configuration values with sensible defaults
60+
failureThreshold := 5
61+
resetTimeout := 60 * time.Second
62+
maxRequests := 3
63+
64+
if config != nil {
65+
failureThreshold = config.CircuitBreakerFailureThreshold
66+
resetTimeout = config.CircuitBreakerResetTimeout
67+
maxRequests = config.CircuitBreakerMaxRequests
68+
}
6669

6770
return &CircuitBreaker{
6871
failureThreshold: failureThreshold,

hitless/circuit_breaker_test.go

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010

1111
func TestCircuitBreaker(t *testing.T) {
1212
config := &Config{
13-
LogLevel: logging.LogLevelError, // Reduce noise in tests
13+
LogLevel: logging.LogLevelError, // Reduce noise in tests
14+
CircuitBreakerFailureThreshold: 5,
15+
CircuitBreakerResetTimeout: 60 * time.Second,
16+
CircuitBreakerMaxRequests: 3,
1417
}
1518

1619
t.Run("InitialState", func(t *testing.T) {
@@ -44,7 +47,7 @@ func TestCircuitBreaker(t *testing.T) {
4447
t.Run("FailureThreshold", func(t *testing.T) {
4548
cb := newCircuitBreaker("test-endpoint:6379", config)
4649
testError := errors.New("test error")
47-
50+
4851
// Fail 4 times (below threshold of 5)
4952
for i := 0; i < 4; i++ {
5053
err := cb.Execute(func() error {
@@ -57,15 +60,15 @@ func TestCircuitBreaker(t *testing.T) {
5760
t.Errorf("Circuit should still be closed after %d failures", i+1)
5861
}
5962
}
60-
63+
6164
// 5th failure should open the circuit
6265
err := cb.Execute(func() error {
6366
return testError
6467
})
6568
if err != testError {
6669
t.Errorf("Expected test error, got %v", err)
6770
}
68-
71+
6972
if cb.GetState() != CircuitBreakerOpen {
7073
t.Errorf("Expected state %v, got %v", CircuitBreakerOpen, cb.GetState())
7174
}
@@ -92,8 +95,13 @@ func TestCircuitBreaker(t *testing.T) {
9295
})
9396

9497
t.Run("HalfOpenTransition", func(t *testing.T) {
95-
cb := newCircuitBreaker("test-endpoint:6379", config)
96-
cb.resetTimeout = 100 * time.Millisecond // Short timeout for testing
98+
testConfig := &Config{
99+
LogLevel: logging.LogLevelError,
100+
CircuitBreakerFailureThreshold: 5,
101+
CircuitBreakerResetTimeout: 100 * time.Millisecond, // Short timeout for testing
102+
CircuitBreakerMaxRequests: 3,
103+
}
104+
cb := newCircuitBreaker("test-endpoint:6379", testConfig)
97105
testError := errors.New("test error")
98106

99107
// Force circuit to open
@@ -125,9 +133,13 @@ func TestCircuitBreaker(t *testing.T) {
125133
})
126134

127135
t.Run("HalfOpenToClosedTransition", func(t *testing.T) {
128-
cb := newCircuitBreaker("test-endpoint:6379", config)
129-
cb.resetTimeout = 50 * time.Millisecond
130-
cb.maxRequests = 3
136+
testConfig := &Config{
137+
LogLevel: logging.LogLevelError,
138+
CircuitBreakerFailureThreshold: 5,
139+
CircuitBreakerResetTimeout: 50 * time.Millisecond,
140+
CircuitBreakerMaxRequests: 3,
141+
}
142+
cb := newCircuitBreaker("test-endpoint:6379", testConfig)
131143
testError := errors.New("test error")
132144

133145
// Force circuit to open
@@ -155,8 +167,13 @@ func TestCircuitBreaker(t *testing.T) {
155167
})
156168

157169
t.Run("HalfOpenToOpenOnFailure", func(t *testing.T) {
158-
cb := newCircuitBreaker("test-endpoint:6379", config)
159-
cb.resetTimeout = 50 * time.Millisecond
170+
testConfig := &Config{
171+
LogLevel: logging.LogLevelError,
172+
CircuitBreakerFailureThreshold: 5,
173+
CircuitBreakerResetTimeout: 50 * time.Millisecond,
174+
CircuitBreakerMaxRequests: 3,
175+
}
176+
cb := newCircuitBreaker("test-endpoint:6379", testConfig)
160177
testError := errors.New("test error")
161178

162179
// Force circuit to open
@@ -216,7 +233,10 @@ func TestCircuitBreaker(t *testing.T) {
216233

217234
func TestCircuitBreakerManager(t *testing.T) {
218235
config := &Config{
219-
LogLevel: logging.LogLevelError,
236+
LogLevel: logging.LogLevelError,
237+
CircuitBreakerFailureThreshold: 5,
238+
CircuitBreakerResetTimeout: 60 * time.Second,
239+
CircuitBreakerMaxRequests: 3,
220240
}
221241

222242
t.Run("GetCircuitBreaker", func(t *testing.T) {
@@ -289,4 +309,48 @@ func TestCircuitBreakerManager(t *testing.T) {
289309
t.Error("Failure count should be reset to 0")
290310
}
291311
})
312+
313+
t.Run("ConfigurableParameters", func(t *testing.T) {
314+
config := &Config{
315+
LogLevel: logging.LogLevelError,
316+
CircuitBreakerFailureThreshold: 10,
317+
CircuitBreakerResetTimeout: 30 * time.Second,
318+
CircuitBreakerMaxRequests: 5,
319+
}
320+
321+
cb := newCircuitBreaker("test-endpoint:6379", config)
322+
323+
// Test that configuration values are used
324+
if cb.failureThreshold != 10 {
325+
t.Errorf("Expected failureThreshold=10, got %d", cb.failureThreshold)
326+
}
327+
if cb.resetTimeout != 30*time.Second {
328+
t.Errorf("Expected resetTimeout=30s, got %v", cb.resetTimeout)
329+
}
330+
if cb.maxRequests != 5 {
331+
t.Errorf("Expected maxRequests=5, got %d", cb.maxRequests)
332+
}
333+
334+
// Test that circuit opens after configured threshold
335+
testError := errors.New("test error")
336+
for i := 0; i < 9; i++ {
337+
err := cb.Execute(func() error { return testError })
338+
if err != testError {
339+
t.Errorf("Expected test error, got %v", err)
340+
}
341+
if cb.GetState() != CircuitBreakerClosed {
342+
t.Errorf("Circuit should still be closed after %d failures", i+1)
343+
}
344+
}
345+
346+
// 10th failure should open the circuit
347+
err := cb.Execute(func() error { return testError })
348+
if err != testError {
349+
t.Errorf("Expected test error, got %v", err)
350+
}
351+
352+
if cb.GetState() != CircuitBreakerOpen {
353+
t.Errorf("Expected state %v, got %v", CircuitBreakerOpen, cb.GetState())
354+
}
355+
})
292356
}

hitless/config.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ type Config struct {
116116
// Default: logging.LogLevelError(0)
117117
LogLevel logging.LogLevel
118118

119+
// Circuit breaker configuration for endpoint failure handling
120+
// CircuitBreakerFailureThreshold is the number of failures before opening the circuit.
121+
// Default: 5
122+
CircuitBreakerFailureThreshold int
123+
124+
// CircuitBreakerResetTimeout is how long to wait before testing if the endpoint recovered.
125+
// Default: 60 seconds
126+
CircuitBreakerResetTimeout time.Duration
127+
128+
// CircuitBreakerMaxRequests is the maximum number of requests allowed in half-open state.
129+
// Default: 3
130+
CircuitBreakerMaxRequests int
131+
119132
// MaxHandoffRetries is the maximum number of times to retry a failed handoff.
120133
// After this many retries, the connection will be removed from the pool.
121134
// Default: 3
@@ -138,6 +151,11 @@ func DefaultConfig() *Config {
138151
PostHandoffRelaxedDuration: 0, // Auto-calculated based on relaxed timeout
139152
LogLevel: logging.LogLevelError,
140153

154+
// Circuit breaker configuration
155+
CircuitBreakerFailureThreshold: 5,
156+
CircuitBreakerResetTimeout: 60 * time.Second,
157+
CircuitBreakerMaxRequests: 3,
158+
141159
// Connection Handoff Configuration
142160
MaxHandoffRetries: 3,
143161
}
@@ -167,6 +185,17 @@ func (c *Config) Validate() error {
167185
return ErrInvalidLogLevel
168186
}
169187

188+
// Circuit breaker validation
189+
if c.CircuitBreakerFailureThreshold < 1 {
190+
return ErrInvalidCircuitBreakerFailureThreshold
191+
}
192+
if c.CircuitBreakerResetTimeout < 0 {
193+
return ErrInvalidCircuitBreakerResetTimeout
194+
}
195+
if c.CircuitBreakerMaxRequests < 1 {
196+
return ErrInvalidCircuitBreakerMaxRequests
197+
}
198+
170199
// Validate Mode (maintenance notifications mode)
171200
if !c.Mode.IsValid() {
172201
return ErrInvalidMaintNotifications
@@ -280,6 +309,22 @@ func (c *Config) ApplyDefaultsWithPoolConfig(poolSize int, maxActiveConns int) *
280309
result.MaxHandoffRetries = c.MaxHandoffRetries
281310
}
282311

312+
// Circuit breaker configuration
313+
result.CircuitBreakerFailureThreshold = defaults.CircuitBreakerFailureThreshold
314+
if c.CircuitBreakerFailureThreshold > 0 {
315+
result.CircuitBreakerFailureThreshold = c.CircuitBreakerFailureThreshold
316+
}
317+
318+
result.CircuitBreakerResetTimeout = defaults.CircuitBreakerResetTimeout
319+
if c.CircuitBreakerResetTimeout > 0 {
320+
result.CircuitBreakerResetTimeout = c.CircuitBreakerResetTimeout
321+
}
322+
323+
result.CircuitBreakerMaxRequests = defaults.CircuitBreakerMaxRequests
324+
if c.CircuitBreakerMaxRequests > 0 {
325+
result.CircuitBreakerMaxRequests = c.CircuitBreakerMaxRequests
326+
}
327+
283328
if result.LogLevel.DebugOrAbove() {
284329
internal.Logger.Printf(context.Background(), "hitless: debug logging enabled")
285330
internal.Logger.Printf(context.Background(), "hitless: config: %+v", result)
@@ -303,6 +348,11 @@ func (c *Config) Clone() *Config {
303348
PostHandoffRelaxedDuration: c.PostHandoffRelaxedDuration,
304349
LogLevel: c.LogLevel,
305350

351+
// Circuit breaker configuration
352+
CircuitBreakerFailureThreshold: c.CircuitBreakerFailureThreshold,
353+
CircuitBreakerResetTimeout: c.CircuitBreakerResetTimeout,
354+
CircuitBreakerMaxRequests: c.CircuitBreakerMaxRequests,
355+
306356
// Configuration fields
307357
MaxHandoffRetries: c.MaxHandoffRetries,
308358
}

hitless/config_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ func TestConfig(t *testing.T) {
3333
t.Errorf("Expected MaxHandoffRetries to be 3, got %d", config.MaxHandoffRetries)
3434
}
3535

36+
// Circuit breaker defaults
37+
if config.CircuitBreakerFailureThreshold != 5 {
38+
t.Errorf("Expected CircuitBreakerFailureThreshold=5, got %d", config.CircuitBreakerFailureThreshold)
39+
}
40+
if config.CircuitBreakerResetTimeout != 60*time.Second {
41+
t.Errorf("Expected CircuitBreakerResetTimeout=60s, got %v", config.CircuitBreakerResetTimeout)
42+
}
43+
if config.CircuitBreakerMaxRequests != 3 {
44+
t.Errorf("Expected CircuitBreakerMaxRequests=3, got %d", config.CircuitBreakerMaxRequests)
45+
}
46+
3647
if config.HandoffTimeout != 15*time.Second {
3748
t.Errorf("Expected HandoffTimeout to be 15s, got %v", config.HandoffTimeout)
3849
}
@@ -377,6 +388,25 @@ func TestEnhancedConfigValidation(t *testing.T) {
377388
}
378389
config.MaxHandoffRetries = 3 // Reset to valid value
379390

391+
// Test circuit breaker validation
392+
config.CircuitBreakerFailureThreshold = 0
393+
if err := config.Validate(); err != ErrInvalidCircuitBreakerFailureThreshold {
394+
t.Errorf("Expected ErrInvalidCircuitBreakerFailureThreshold, got %v", err)
395+
}
396+
config.CircuitBreakerFailureThreshold = 5 // Reset to valid value
397+
398+
config.CircuitBreakerResetTimeout = -1 * time.Second
399+
if err := config.Validate(); err != ErrInvalidCircuitBreakerResetTimeout {
400+
t.Errorf("Expected ErrInvalidCircuitBreakerResetTimeout, got %v", err)
401+
}
402+
config.CircuitBreakerResetTimeout = 60 * time.Second // Reset to valid value
403+
404+
config.CircuitBreakerMaxRequests = 0
405+
if err := config.Validate(); err != ErrInvalidCircuitBreakerMaxRequests {
406+
t.Errorf("Expected ErrInvalidCircuitBreakerMaxRequests, got %v", err)
407+
}
408+
config.CircuitBreakerMaxRequests = 3 // Reset to valid value
409+
380410
// Should pass validation again
381411
if err := config.Validate(); err != nil {
382412
t.Errorf("Config should be valid after reset, got error: %v", err)

hitless/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,10 @@ var (
5353
var (
5454
ErrCircuitBreakerOpen = errors.New("hitless: circuit breaker is open, failing fast")
5555
)
56+
57+
// circuit breaker configuration errors
58+
var (
59+
ErrInvalidCircuitBreakerFailureThreshold = errors.New("hitless: circuit breaker failure threshold must be >= 1")
60+
ErrInvalidCircuitBreakerResetTimeout = errors.New("hitless: circuit breaker reset timeout must be >= 0")
61+
ErrInvalidCircuitBreakerMaxRequests = errors.New("hitless: circuit breaker max requests must be >= 1")
62+
)

hitless/handoff_worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func newHandoffWorkerManager(config *Config, poolHook *PoolHook) *handoffWorkerM
4444
handoffQueue: make(chan HandoffRequest, config.HandoffQueueSize),
4545
shutdown: make(chan struct{}),
4646
maxWorkers: config.MaxWorkers,
47-
activeWorkers: atomic.Int32{}, // Start with no workers - create on demand
47+
activeWorkers: atomic.Int32{}, // Start with no workers - create on demand
4848
workerTimeout: 15 * time.Second, // Workers exit after 15s of inactivity
4949
config: config,
5050
poolHook: poolHook,
@@ -423,14 +423,14 @@ func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, reque
423423
pooler.Remove(ctx, conn, err)
424424
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
425425
internal.Logger.Printf(ctx,
426-
"hitless: removed conn[%d] from pool due to max handoff retries reached: %v",
426+
"hitless: removed conn[%d] from pool due: %v",
427427
conn.GetID(), err)
428428
}
429429
} else {
430430
conn.Close()
431431
if hwm.config != nil && hwm.config.LogLevel.WarnOrAbove() { // Warning level
432432
internal.Logger.Printf(ctx,
433-
"hitless: no pool provided for conn[%d], cannot remove due to handoff initialization failure: %v",
433+
"hitless: no pool provided for conn[%d], cannot remove due to: %v",
434434
conn.GetID(), err)
435435
}
436436
}

0 commit comments

Comments
 (0)