Skip to content

Commit 8bb8d88

Browse files
authored
Merge pull request #2388 from c9s/dboy/risk/error-breaker
2 parents 62c7ef9 + 848cb89 commit 8bb8d88

File tree

2 files changed

+53
-20
lines changed

2 files changed

+53
-20
lines changed

pkg/risk/circuitbreaker/errorbreaker.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,14 @@ type ErrorBreaker struct {
4242
mu sync.RWMutex
4343

4444
// breaker configuration
45-
Enabled bool `json:"enabled"`
46-
MaxConsecutiveErrorCount int `json:"maxConsecutiveErrorCount"`
47-
HaltDuration types.Duration `json:"haltDuration"`
45+
Enabled bool `json:"enabled"`
46+
// MaxConsecutiveErrorCount defines the maximum number of consecutive errors allowed before halting.
47+
MaxConsecutiveErrorCount int `json:"maxConsecutiveErrorCount"`
48+
// HaltDuration defines the duration for which the breaker will be halted when triggered.
49+
HaltDuration types.Duration `json:"haltDuration"`
50+
// ErrorWindow defines the time window for errors to be considered consecutive (inclusive).
51+
// If set to 0, all errors are considered consecutive regardless of their timestamps.
52+
ErrorWindow types.Duration `json:"errorWindow"`
4853

4954
// breaker state
5055
errors []ErrorRecord
@@ -66,7 +71,8 @@ type ErrorBreaker struct {
6671
// NewErrorBreaker creates a new ErrorBreaker with the given parameters.
6772
// maxErrors: maximum number of consecutive errors allowed
6873
// haltDuration: duration for which the breaker will be halted
69-
func NewErrorBreaker(strategy, strategyInstance string, maxErrors int, haltDuration types.Duration) *ErrorBreaker {
74+
// errorWindow: time window for errors to be considered consecutive (0 to disable)
75+
func NewErrorBreaker(strategy, strategyInstance string, maxErrors int, haltDuration, errorWindow types.Duration) *ErrorBreaker {
7076
if maxErrors <= 0 {
7177
log.Warnf("the maxErrors cannot be negative, fallback to 5: %d", maxErrors)
7278
maxErrors = 5
@@ -75,6 +81,7 @@ func NewErrorBreaker(strategy, strategyInstance string, maxErrors int, haltDurat
7581
Enabled: true,
7682
MaxConsecutiveErrorCount: maxErrors,
7783
HaltDuration: haltDuration,
84+
ErrorWindow: errorWindow,
7885
errors: make([]ErrorRecord, 0, maxErrors),
7986
}
8087
b.SetMetricsInfo(strategy, strategyInstance)
@@ -108,6 +115,14 @@ func (b *ErrorBreaker) recordError(now time.Time, err error) {
108115
return
109116
}
110117

118+
if !b.halted && len(b.errors) > 0 && b.ErrorWindow.Duration() > 0 {
119+
lastRecord := b.errors[len(b.errors)-1]
120+
if now.Sub(lastRecord.timestamp) > b.ErrorWindow.Duration() {
121+
// Clear old errors outside the error window
122+
b.errors = b.errors[:0]
123+
}
124+
}
125+
111126
// Add the new error record
112127
b.errors = append(b.errors, ErrorRecord{
113128
timestamp: now,

pkg/risk/circuitbreaker/errorbreaker_test.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestErrorBreaker_RecordError(t *testing.T) {
1414
t.Run("should not halt when errors are below threshold", func(t *testing.T) {
15-
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute))
15+
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute), types.Duration(0))
1616
now := time.Now()
1717

1818
breaker.recordError(now, assert.AnError)
@@ -23,7 +23,7 @@ func TestErrorBreaker_RecordError(t *testing.T) {
2323
})
2424

2525
t.Run("should halt when errors reach threshold", func(t *testing.T) {
26-
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute))
26+
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute), types.Duration(0))
2727
now := time.Now()
2828

2929
breaker.recordError(now, assert.AnError)
@@ -34,7 +34,7 @@ func TestErrorBreaker_RecordError(t *testing.T) {
3434
})
3535

3636
t.Run("should reset when nil error is recorded", func(t *testing.T) {
37-
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute))
37+
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute), types.Duration(0))
3838
now := time.Now()
3939

4040
breaker.recordError(now, assert.AnError)
@@ -48,7 +48,7 @@ func TestErrorBreaker_RecordError(t *testing.T) {
4848
})
4949

5050
t.Run("should auto-reset when halt duration expires", func(t *testing.T) {
51-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(100*time.Millisecond))
51+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(100*time.Millisecond), types.Duration(0))
5252
now := time.Now()
5353

5454
breaker.recordError(now, assert.AnError)
@@ -64,7 +64,7 @@ func TestErrorBreaker_RecordError(t *testing.T) {
6464
})
6565

6666
t.Run("should call halt callbacks only once when max error count is reached", func(t *testing.T) {
67-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute))
67+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute), types.Duration(0))
6868
now := time.Now()
6969

7070
// Track callback invocations
@@ -100,7 +100,7 @@ func TestErrorBreaker_RecordError(t *testing.T) {
100100
}
101101

102102
func TestErrorBreaker_Reset(t *testing.T) {
103-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute))
103+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute), types.Duration(0))
104104
now := time.Now()
105105

106106
breaker.recordError(now, assert.AnError)
@@ -116,7 +116,7 @@ func TestErrorBreaker_Reset(t *testing.T) {
116116
}
117117

118118
func TestErrorBreaker_ErrorCount(t *testing.T) {
119-
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute))
119+
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute), types.Duration(0))
120120
now := time.Now()
121121

122122
assert.Equal(t, 0, breaker.ErrorCount())
@@ -134,7 +134,7 @@ func TestErrorBreaker_ErrorCount(t *testing.T) {
134134
}
135135

136136
func TestErrorBreaker_ConcurrentAccess(t *testing.T) {
137-
breaker := NewErrorBreaker("test", "test-instance", 20, types.Duration(time.Minute))
137+
breaker := NewErrorBreaker("test", "test-instance", 20, types.Duration(time.Minute), types.Duration(0))
138138

139139
// Spawn multiple goroutines to record errors concurrently
140140
var wg sync.WaitGroup
@@ -158,14 +158,14 @@ func TestErrorBreaker_ConcurrentAccess(t *testing.T) {
158158

159159
func TestErrorBreaker_EdgeCases(t *testing.T) {
160160
t.Run("maxErrors of 1 should halt immediately", func(t *testing.T) {
161-
breaker := NewErrorBreaker("test", "test-instance", 1, types.Duration(time.Minute))
161+
breaker := NewErrorBreaker("test", "test-instance", 1, types.Duration(time.Minute), types.Duration(0))
162162
now := time.Now()
163163
breaker.recordError(now, assert.AnError)
164164
assert.True(t, breaker.isHalted(now))
165165
})
166166

167167
t.Run("very short halt duration", func(t *testing.T) {
168-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Nanosecond))
168+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Nanosecond), types.Duration(0))
169169
now := time.Now()
170170
breaker.recordError(now, assert.AnError)
171171
breaker.recordError(now, assert.AnError)
@@ -175,7 +175,7 @@ func TestErrorBreaker_EdgeCases(t *testing.T) {
175175
})
176176

177177
t.Run("recording errors after halted state", func(t *testing.T) {
178-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute))
178+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute), types.Duration(0))
179179
now := time.Now()
180180
breaker.recordError(now, assert.AnError)
181181
breaker.recordError(now, assert.AnError)
@@ -187,7 +187,7 @@ func TestErrorBreaker_EdgeCases(t *testing.T) {
187187
})
188188

189189
t.Run("nil error should reset breaker", func(t *testing.T) {
190-
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute))
190+
breaker := NewErrorBreaker("test", "test-instance", 2, types.Duration(time.Minute), types.Duration(0))
191191
now := time.Now()
192192

193193
breaker.recordError(now, nil)
@@ -205,7 +205,7 @@ func TestErrorBreaker_EdgeCases(t *testing.T) {
205205

206206
func TestErrorBreaker_Errors(t *testing.T) {
207207
t.Run("should return all recorded errors", func(t *testing.T) {
208-
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute))
208+
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute), types.Duration(0))
209209
now := time.Now()
210210

211211
err1 := assert.AnError
@@ -221,13 +221,13 @@ func TestErrorBreaker_Errors(t *testing.T) {
221221
})
222222

223223
t.Run("should return empty slice when no errors", func(t *testing.T) {
224-
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute))
224+
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute), types.Duration(0))
225225
errors := breaker.Errors()
226226
assert.Empty(t, errors)
227227
})
228228

229229
t.Run("should return empty after reset", func(t *testing.T) {
230-
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute))
230+
breaker := NewErrorBreaker("test", "test-instance", 5, types.Duration(time.Minute), types.Duration(0))
231231
now := time.Now()
232232

233233
breaker.recordError(now, assert.AnError)
@@ -237,10 +237,28 @@ func TestErrorBreaker_Errors(t *testing.T) {
237237
breaker.recordError(now, nil) // Reset via nil error
238238
assert.Empty(t, breaker.Errors())
239239
})
240+
241+
t.Run("should not halt when error occurs outside error window", func(t *testing.T) {
242+
breaker := NewErrorBreaker("test", "test-instance", 3, types.Duration(time.Minute), types.Duration(5*time.Second))
243+
now := time.Now()
244+
245+
// Record 2 errors in a row
246+
breaker.recordError(now, assert.AnError)
247+
breaker.recordError(now.Add(1*time.Second), assert.AnError)
248+
assert.Equal(t, 2, breaker.ErrorCount())
249+
assert.False(t, breaker.isHalted(now.Add(1*time.Second)))
250+
251+
// Third error comes in outside the error window (6 seconds after the second error)
252+
breaker.recordError(now.Add(7*time.Second), assert.AnError)
253+
254+
// Should have reset and only count the latest error
255+
assert.Equal(t, 1, breaker.ErrorCount())
256+
assert.False(t, breaker.isHalted(now.Add(7*time.Second)))
257+
})
240258
}
241259

242260
func TestErrorBreaker_Marshal(t *testing.T) {
243-
breaker := NewErrorBreaker("test-strategy", "test-instance", 5, types.Duration(2*time.Minute))
261+
breaker := NewErrorBreaker("test-strategy", "test-instance", 5, types.Duration(2*time.Minute), types.Duration(0))
244262

245263
data, err := json.Marshal(breaker)
246264
assert.NoError(t, err)

0 commit comments

Comments
 (0)