Skip to content

Commit 98e4402

Browse files
authored
Merge pull request #2378 from c9s/dboy/risk/error-circuitbreaker
FEATURE: [circuitbreaker] implement ErrorBreaker
2 parents cbfa80a + 7056338 commit 98e4402

File tree

3 files changed

+613
-0
lines changed

3 files changed

+613
-0
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package circuitbreaker
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"sync"
7+
"time"
8+
9+
"github.com/c9s/bbgo/pkg/types"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promauto"
12+
log "github.com/sirupsen/logrus"
13+
"github.com/slack-go/slack"
14+
)
15+
16+
var errorCntMetric = promauto.NewGaugeVec(
17+
prometheus.GaugeOpts{
18+
Name: "bbgo_error_breaker_consecutive_error_count",
19+
Help: "Current count of consecutive errors tracked by the error breaker",
20+
},
21+
[]string{"strategy", "strategyInstance"},
22+
)
23+
var errorHaltMetric = promauto.NewGaugeVec(
24+
prometheus.GaugeOpts{
25+
Name: "bbgo_error_breaker_halt",
26+
Help: "Indicates whether the error breaker is currently in a halted state (1 for halted, 0 for active)",
27+
},
28+
[]string{"strategy", "strategyInstance"},
29+
)
30+
31+
// ErrorRecord stores an error along with its timestamp
32+
type ErrorRecord struct {
33+
timestamp time.Time
34+
err error
35+
}
36+
37+
// ErrorBreaker is a circuit breaker that tracks consecutive errors
38+
// and halts operations if the error count exceeds the threshold.
39+
//
40+
//go:generate callbackgen -type ErrorBreaker
41+
type ErrorBreaker struct {
42+
mu sync.RWMutex
43+
44+
// breaker configuration
45+
MaxConsecutiveErrorCount int `json:"maxConsecutiveErrorCount"`
46+
HaltDuration types.Duration `json:"haltDuration"`
47+
48+
// breaker state
49+
errors []ErrorRecord
50+
halted bool
51+
haltedAt time.Time
52+
53+
// haltCallbacks are the callbacks that will be called when the breaker is halted.
54+
// The callbacks will be called when the breaker is locked.
55+
// As a result, the callbacks should not call any methods that require locking the breaker again.
56+
// Ideally, the callbacks should just make use of the passed parameters to perform their actions.
57+
haltCallbacks []func(haltedAt time.Time, records []ErrorRecord)
58+
59+
// error breaker metrics
60+
strategyInstance string
61+
errorCntMetric prometheus.Gauge
62+
errorHaltMetric prometheus.Gauge
63+
}
64+
65+
// NewErrorBreaker creates a new ErrorBreaker with the given parameters.
66+
// maxErrors: maximum number of consecutive errors allowed
67+
// haltDuration: duration for which the breaker will be halted
68+
func NewErrorBreaker(strategy, strategyInstance string, maxErrors int, haltDuration types.Duration) *ErrorBreaker {
69+
if maxErrors <= 0 {
70+
log.Warnf("the maxErrors cannot be negative, fallback to 5: %d", maxErrors)
71+
maxErrors = 5
72+
}
73+
b := &ErrorBreaker{
74+
MaxConsecutiveErrorCount: maxErrors,
75+
HaltDuration: haltDuration,
76+
errors: make([]ErrorRecord, 0, maxErrors),
77+
}
78+
b.SetMetricsInfo(strategy, strategyInstance)
79+
b.updateMetrics()
80+
return b
81+
}
82+
83+
func (b *ErrorBreaker) SetMetricsInfo(strategy, strategyInstance string) {
84+
labels := prometheus.Labels{"strategy": strategy, "strategyInstance": strategyInstance}
85+
b.strategyInstance = strategyInstance
86+
b.errorCntMetric = errorCntMetric.With(labels)
87+
b.errorHaltMetric = errorHaltMetric.With(labels)
88+
b.updateMetrics()
89+
}
90+
91+
// RecordError records a critical error and updates the circuit breaker state.
92+
// If err is nil, the breaker is reset.
93+
// err: the error that occurred (if nil, the breaker is reset)
94+
func (b *ErrorBreaker) RecordError(err error) {
95+
b.mu.Lock()
96+
defer b.mu.Unlock()
97+
defer b.updateMetrics()
98+
99+
b.recordError(time.Now(), err)
100+
}
101+
102+
func (b *ErrorBreaker) recordError(now time.Time, err error) {
103+
// If no error occurred, reset the breaker
104+
if err == nil {
105+
b.reset()
106+
return
107+
}
108+
109+
// Add the new error record
110+
b.errors = append(b.errors, ErrorRecord{
111+
timestamp: now,
112+
err: err,
113+
})
114+
115+
// the breaker is already halted
116+
// keep halted until the duration expires
117+
if b.halted {
118+
if len(b.errors) > b.MaxConsecutiveErrorCount {
119+
// drop the oldest error record to prevent unbounded growth
120+
b.errors = b.errors[1:]
121+
}
122+
return
123+
}
124+
125+
// the breaker is not halted yet
126+
// check if we've exceeded the max errors threshold
127+
if len(b.errors) >= b.MaxConsecutiveErrorCount {
128+
// trigger halt
129+
b.EmitHalt(now, b.errors)
130+
b.halted = true
131+
b.haltedAt = now
132+
}
133+
}
134+
135+
// IsHalted returns whether the circuit breaker is in a halted state.
136+
// If the breaker is halted and the halt duration has expired, it automatically resets the breaker.
137+
func (b *ErrorBreaker) IsHalted() bool {
138+
b.mu.Lock()
139+
defer b.mu.Unlock()
140+
141+
isHalted := b.isHalted(time.Now())
142+
b.updateMetrics()
143+
144+
return isHalted
145+
}
146+
147+
func (b *ErrorBreaker) isHalted(now time.Time) bool {
148+
// If not halted, return false immediately
149+
if !b.halted {
150+
return false
151+
}
152+
153+
// Check if the halt duration has expired
154+
if !b.haltedAt.IsZero() && now.Sub(b.haltedAt) >= b.HaltDuration.Duration() {
155+
// Halt duration has expired, reset the breaker
156+
b.reset()
157+
}
158+
159+
return b.halted
160+
}
161+
162+
// Reset resets the circuit breaker, clearing all recorded errors and the halted state.
163+
func (b *ErrorBreaker) Reset() {
164+
b.mu.Lock()
165+
defer b.mu.Unlock()
166+
167+
b.reset()
168+
}
169+
170+
func (b *ErrorBreaker) reset() {
171+
if b.errors != nil {
172+
b.errors = b.errors[:0]
173+
} else {
174+
b.errors = make([]ErrorRecord, 0, b.MaxConsecutiveErrorCount)
175+
}
176+
b.halted = false
177+
b.haltedAt = time.Time{}
178+
b.updateMetrics()
179+
}
180+
181+
// ErrorCount returns the current number of errors tracked.
182+
func (b *ErrorBreaker) ErrorCount() int {
183+
b.mu.RLock()
184+
defer b.mu.RUnlock()
185+
186+
return len(b.errors)
187+
}
188+
189+
// Errors returns a copy of all errors currently tracked.
190+
func (b *ErrorBreaker) Errors() []error {
191+
b.mu.RLock()
192+
defer b.mu.RUnlock()
193+
194+
var result []error
195+
for _, record := range b.errors {
196+
result = append(result, record.err)
197+
}
198+
199+
return result
200+
}
201+
202+
func (b *ErrorBreaker) SlackAttachment() slack.Attachment {
203+
b.mu.RLock()
204+
defer b.mu.RUnlock()
205+
206+
errorCount := len(b.errors)
207+
208+
// Build error details text
209+
var errorDetails strings.Builder
210+
if errorCount > 0 {
211+
errorDetails.WriteString(fmt.Sprintf("Errors encountered (%s):\n", b.strategyInstance))
212+
for i, record := range b.errors {
213+
errorDetails.WriteString(fmt.Sprintf("%d. [%s] %v\n",
214+
i+1,
215+
record.timestamp.Format(time.RFC3339),
216+
record.err,
217+
))
218+
}
219+
}
220+
221+
status := "ACTIVE"
222+
title := "✅ Error Circuit Breaker ACTIVE"
223+
color := "#228B22"
224+
if b.halted {
225+
status = "HALTED"
226+
title = "🛑 Error Circuit Breaker HALTED"
227+
color = "danger"
228+
}
229+
230+
fields := []slack.AttachmentField{
231+
{Title: "Status", Value: status, Short: true},
232+
{Title: "Error Count", Value: fmt.Sprintf("%d / %d", errorCount, b.MaxConsecutiveErrorCount), Short: true},
233+
}
234+
235+
if len(b.errors) > 0 {
236+
lastError := b.errors[0]
237+
238+
for _, record := range b.errors {
239+
if record.timestamp.After(lastError.timestamp) {
240+
lastError = record
241+
}
242+
}
243+
244+
fields = append(fields,
245+
slack.AttachmentField{
246+
Title: "Last Error At",
247+
Value: lastError.timestamp.Format(time.RFC3339),
248+
Short: true,
249+
},
250+
)
251+
if !b.haltedAt.IsZero() {
252+
fields = append(fields,
253+
slack.AttachmentField{
254+
Title: "Halted At",
255+
Value: b.haltedAt.Format(time.RFC3339),
256+
Short: true,
257+
},
258+
)
259+
}
260+
}
261+
262+
if !b.haltedAt.IsZero() {
263+
recoveryTime := b.haltedAt.Add(b.HaltDuration.Duration())
264+
fields = append(fields,
265+
slack.AttachmentField{
266+
Title: "Halt Duration",
267+
Value: b.HaltDuration.Duration().String(),
268+
Short: true,
269+
},
270+
slack.AttachmentField{
271+
Title: "Recovery At",
272+
Value: recoveryTime.Format(time.RFC3339),
273+
Short: true,
274+
},
275+
)
276+
}
277+
278+
return slack.Attachment{
279+
Color: color,
280+
Title: title,
281+
Text: errorDetails.String(),
282+
Fields: fields,
283+
}
284+
}
285+
286+
func (b *ErrorBreaker) updateMetrics() {
287+
b.errorCntMetric.Set(float64(len(b.errors)))
288+
if b.halted {
289+
b.errorHaltMetric.Set(1)
290+
} else {
291+
b.errorHaltMetric.Set(0)
292+
}
293+
}

pkg/risk/circuitbreaker/errorbreaker_callbacks.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)