Skip to content

Commit 4030a2e

Browse files
committed
feat: implement circuit breaker interceptor
Add circuit breaker pattern implementation as an interceptor that wraps dispatchers to prevent cascading failures. Key features: - Automatic failure detection and circuit opening - Configurable failure threshold and timeout - Half-open state for gradual recovery - Volume threshold to prevent premature opening - Per-origin circuit state tracking - CircuitBreakerError for open circuit requests The interceptor monitors request failures and opens the circuit when threshold is exceeded, allowing the downstream service time to recover before attempting reconnection. Signed-off-by: Matteo Collina <hello@matteocollina.com>
1 parent c9ed7bd commit 4030a2e

File tree

3 files changed

+354
-2
lines changed

3 files changed

+354
-2
lines changed

index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ module.exports.interceptors = {
4949
dump: require('./lib/interceptor/dump'),
5050
dns: require('./lib/interceptor/dns'),
5151
cache: require('./lib/interceptor/cache'),
52-
decompress: require('./lib/interceptor/decompress')
52+
decompress: require('./lib/interceptor/decompress'),
53+
circuitBreaker: require('./lib/interceptor/circuit-breaker')
5354
}
5455

5556
module.exports.cacheStores = {

lib/core/errors.js

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,26 @@ class MaxOriginsReachedError extends UndiciError {
421421
}
422422
}
423423

424+
const kCircuitBreakerError = Symbol.for('undici.error.UND_ERR_CIRCUIT_BREAKER')
425+
class CircuitBreakerError extends UndiciError {
426+
constructor (message, { state, key }) {
427+
super(message)
428+
this.name = 'CircuitBreakerError'
429+
this.message = message || 'Circuit breaker is open'
430+
this.code = 'UND_ERR_CIRCUIT_BREAKER'
431+
this.state = state
432+
this.key = key
433+
}
434+
435+
static [Symbol.hasInstance] (instance) {
436+
return instance && instance[kCircuitBreakerError] === true
437+
}
438+
439+
get [kCircuitBreakerError] () {
440+
return true
441+
}
442+
}
443+
424444
module.exports = {
425445
AbortError,
426446
HTTPParserError,
@@ -444,5 +464,6 @@ module.exports = {
444464
RequestRetryError,
445465
ResponseError,
446466
SecureProxyConnectionError,
447-
MaxOriginsReachedError
467+
MaxOriginsReachedError,
468+
CircuitBreakerError
448469
}

lib/interceptor/circuit-breaker.js

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
'use strict'
2+
3+
const { InvalidArgumentError, CircuitBreakerError } = require('../core/errors')
4+
const DecoratorHandler = require('../handler/decorator-handler')
5+
6+
// Circuit states
7+
const STATE_CLOSED = 0
8+
const STATE_OPEN = 1
9+
const STATE_HALF_OPEN = 2
10+
11+
// Default error codes that trigger circuit breaker
12+
const DEFAULT_ERROR_CODES = new Set([
13+
'UND_ERR_CONNECT_TIMEOUT',
14+
'UND_ERR_HEADERS_TIMEOUT',
15+
'UND_ERR_BODY_TIMEOUT',
16+
'UND_ERR_SOCKET',
17+
'ECONNREFUSED',
18+
'ECONNRESET',
19+
'ETIMEDOUT',
20+
'EPIPE',
21+
'ENOTFOUND',
22+
'ENETUNREACH',
23+
'EHOSTUNREACH',
24+
'EAI_AGAIN'
25+
])
26+
27+
// Default status codes that trigger circuit breaker
28+
const DEFAULT_STATUS_CODES = new Set([500, 502, 503, 504])
29+
30+
/**
31+
* Per-key circuit state tracking.
32+
* Uses a simple sliding window counter for fast failure tracking.
33+
*/
34+
class CircuitState {
35+
constructor () {
36+
this.state = STATE_CLOSED
37+
this.failureCount = 0
38+
this.successCount = 0
39+
this.lastFailureTime = 0
40+
this.halfOpenRequests = 0
41+
}
42+
43+
reset () {
44+
this.state = STATE_CLOSED
45+
this.failureCount = 0
46+
this.successCount = 0
47+
this.lastFailureTime = 0
48+
this.halfOpenRequests = 0
49+
}
50+
}
51+
52+
/**
53+
* Circuit breaker state storage with automatic cleanup.
54+
*/
55+
class CircuitBreakerStorage {
56+
#circuits = new Map()
57+
#maxSize
58+
#cleanupInterval
59+
#cleanupTimer = null
60+
61+
constructor (opts = {}) {
62+
this.#maxSize = opts.maxSize ?? 1000
63+
this.#cleanupInterval = opts.cleanupInterval ?? 60000
64+
65+
// Start cleanup timer
66+
if (this.#cleanupInterval > 0) {
67+
this.#cleanupTimer = setInterval(() => this.#cleanup(), this.#cleanupInterval).unref()
68+
}
69+
}
70+
71+
get (key) {
72+
let circuit = this.#circuits.get(key)
73+
if (!circuit) {
74+
// Enforce max size with LRU-like eviction
75+
if (this.#circuits.size >= this.#maxSize) {
76+
const firstKey = this.#circuits.keys().next().value
77+
this.#circuits.delete(firstKey)
78+
}
79+
circuit = new CircuitState()
80+
this.#circuits.set(key, circuit)
81+
}
82+
return circuit
83+
}
84+
85+
delete (key) {
86+
this.#circuits.delete(key)
87+
}
88+
89+
#cleanup () {
90+
const now = Date.now()
91+
for (const [key, circuit] of this.#circuits) {
92+
// Remove circuits that have been closed for a while
93+
if (circuit.state === STATE_CLOSED && circuit.failureCount === 0) {
94+
this.#circuits.delete(key)
95+
} else if (circuit.state === STATE_OPEN && circuit.lastFailureTime > 0) {
96+
// Also clean up very old open circuits (stale entries)
97+
const age = now - circuit.lastFailureTime
98+
if (age > 300000) { // 5 minutes
99+
this.#circuits.delete(key)
100+
}
101+
}
102+
}
103+
}
104+
105+
destroy () {
106+
if (this.#cleanupTimer) {
107+
clearInterval(this.#cleanupTimer)
108+
this.#cleanupTimer = null
109+
}
110+
this.#circuits.clear()
111+
}
112+
113+
get size () {
114+
return this.#circuits.size
115+
}
116+
}
117+
118+
class CircuitBreakerHandler extends DecoratorHandler {
119+
#circuit
120+
#opts
121+
#statusCodes
122+
#errorCodes
123+
#key
124+
125+
constructor (opts, circuit, key, handler) {
126+
super(handler)
127+
this.#opts = opts
128+
this.#circuit = circuit
129+
this.#statusCodes = opts.statusCodes
130+
this.#errorCodes = opts.errorCodes
131+
this.#key = key
132+
}
133+
134+
onResponseStart (controller, statusCode, headers, statusMessage) {
135+
if (this.#statusCodes.has(statusCode)) {
136+
this.#recordFailure()
137+
} else {
138+
this.#recordSuccess()
139+
}
140+
return super.onResponseStart(controller, statusCode, headers, statusMessage)
141+
}
142+
143+
onResponseEnd (controller, trailers) {
144+
return super.onResponseEnd(controller, trailers)
145+
}
146+
147+
onResponseError (controller, err) {
148+
const code = err?.code
149+
if (code && this.#errorCodes.has(code)) {
150+
this.#recordFailure()
151+
}
152+
return super.onResponseError(controller, err)
153+
}
154+
155+
#recordFailure () {
156+
const circuit = this.#circuit
157+
circuit.failureCount++
158+
circuit.lastFailureTime = Date.now()
159+
circuit.successCount = 0
160+
161+
if (circuit.state === STATE_HALF_OPEN) {
162+
// Any failure in half-open immediately opens the circuit
163+
circuit.state = STATE_OPEN
164+
circuit.halfOpenRequests = 0
165+
} else if (circuit.state === STATE_CLOSED) {
166+
if (circuit.failureCount >= this.#opts.threshold) {
167+
circuit.state = STATE_OPEN
168+
}
169+
}
170+
}
171+
172+
#recordSuccess () {
173+
const circuit = this.#circuit
174+
175+
if (circuit.state === STATE_HALF_OPEN) {
176+
circuit.successCount++
177+
circuit.halfOpenRequests = Math.max(0, circuit.halfOpenRequests - 1)
178+
179+
if (circuit.successCount >= this.#opts.successThreshold) {
180+
circuit.reset()
181+
}
182+
} else if (circuit.state === STATE_CLOSED) {
183+
// In closed state, reset failure count on success
184+
circuit.failureCount = 0
185+
}
186+
}
187+
}
188+
189+
/**
190+
* Default key generator - uses origin only for simplicity.
191+
* Override with getKey option for route-level granularity.
192+
*/
193+
function defaultGetKey (opts) {
194+
const origin = typeof opts.origin === 'string' ? opts.origin : opts.origin?.origin
195+
return origin || 'unknown'
196+
}
197+
198+
/**
199+
* Creates a circuit breaker interceptor.
200+
*
201+
* @param {Object} opts Configuration options
202+
* @param {number} [opts.threshold=5] Number of failures before opening circuit
203+
* @param {number} [opts.timeout=30000] How long circuit stays open (ms)
204+
* @param {number} [opts.successThreshold=1] Successes needed in half-open to close
205+
* @param {number} [opts.maxHalfOpenRequests=1] Max concurrent requests in half-open
206+
* @param {Set|Array} [opts.statusCodes=[500,502,503,504]] Status codes that count as failures
207+
* @param {Set|Array} [opts.errorCodes] Error codes that count as failures
208+
* @param {Function} [opts.getKey] Function to extract circuit key from request opts
209+
* @param {CircuitBreakerStorage} [opts.storage] Custom storage instance
210+
* @param {Function} [opts.onStateChange] Callback when circuit state changes
211+
*/
212+
function createCircuitBreakerInterceptor (opts = {}) {
213+
const {
214+
threshold = 5,
215+
timeout = 30000,
216+
successThreshold = 1,
217+
maxHalfOpenRequests = 1,
218+
getKey = defaultGetKey,
219+
storage = new CircuitBreakerStorage(),
220+
onStateChange = null
221+
} = opts
222+
223+
// Validate options
224+
if (typeof threshold !== 'number' || threshold < 1) {
225+
throw new InvalidArgumentError('threshold must be a positive number')
226+
}
227+
if (typeof timeout !== 'number' || timeout < 0) {
228+
throw new InvalidArgumentError('timeout must be a non-negative number')
229+
}
230+
if (typeof successThreshold !== 'number' || successThreshold < 1) {
231+
throw new InvalidArgumentError('successThreshold must be a positive number')
232+
}
233+
if (typeof maxHalfOpenRequests !== 'number' || maxHalfOpenRequests < 1) {
234+
throw new InvalidArgumentError('maxHalfOpenRequests must be a positive number')
235+
}
236+
if (typeof getKey !== 'function') {
237+
throw new InvalidArgumentError('getKey must be a function')
238+
}
239+
if (onStateChange != null && typeof onStateChange !== 'function') {
240+
throw new InvalidArgumentError('onStateChange must be a function')
241+
}
242+
243+
// Convert arrays to Sets for O(1) lookup
244+
let statusCodes = opts.statusCodes
245+
if (statusCodes == null) {
246+
statusCodes = DEFAULT_STATUS_CODES
247+
} else if (Array.isArray(statusCodes)) {
248+
statusCodes = new Set(statusCodes)
249+
} else if (!(statusCodes instanceof Set)) {
250+
throw new InvalidArgumentError('statusCodes must be an array or Set')
251+
}
252+
253+
let errorCodes = opts.errorCodes
254+
if (errorCodes == null) {
255+
errorCodes = DEFAULT_ERROR_CODES
256+
} else if (Array.isArray(errorCodes)) {
257+
errorCodes = new Set(errorCodes)
258+
} else if (!(errorCodes instanceof Set)) {
259+
throw new InvalidArgumentError('errorCodes must be an array or Set')
260+
}
261+
262+
const resolvedOpts = {
263+
threshold,
264+
timeout,
265+
successThreshold,
266+
maxHalfOpenRequests,
267+
statusCodes,
268+
errorCodes
269+
}
270+
271+
return dispatch => {
272+
return function circuitBreakerInterceptor (opts, handler) {
273+
const key = getKey(opts)
274+
const circuit = storage.get(key)
275+
const now = Date.now()
276+
277+
// State machine logic
278+
if (circuit.state === STATE_OPEN) {
279+
// Check if timeout has elapsed
280+
if (now - circuit.lastFailureTime >= timeout) {
281+
circuit.state = STATE_HALF_OPEN
282+
circuit.halfOpenRequests = 0
283+
circuit.successCount = 0
284+
if (onStateChange) {
285+
onStateChange(key, 'half-open', 'open')
286+
}
287+
} else {
288+
// Fast fail - circuit is open
289+
const err = new CircuitBreakerError('Circuit breaker is open', {
290+
state: 'open',
291+
key
292+
})
293+
// Use queueMicrotask for async error delivery to match other interceptors
294+
queueMicrotask(() => {
295+
handler.onResponseError?.(null, err)
296+
})
297+
return true
298+
}
299+
}
300+
301+
if (circuit.state === STATE_HALF_OPEN) {
302+
// Check if we've reached max half-open requests
303+
if (circuit.halfOpenRequests >= maxHalfOpenRequests) {
304+
const err = new CircuitBreakerError('Circuit breaker is half-open (max requests reached)', {
305+
state: 'half-open',
306+
key
307+
})
308+
queueMicrotask(() => {
309+
handler.onResponseError?.(null, err)
310+
})
311+
return true
312+
}
313+
circuit.halfOpenRequests++
314+
}
315+
316+
return dispatch(
317+
opts,
318+
new CircuitBreakerHandler(resolvedOpts, circuit, key, handler)
319+
)
320+
}
321+
}
322+
}
323+
324+
// Export state constants for testing/debugging
325+
createCircuitBreakerInterceptor.STATE_CLOSED = STATE_CLOSED
326+
createCircuitBreakerInterceptor.STATE_OPEN = STATE_OPEN
327+
createCircuitBreakerInterceptor.STATE_HALF_OPEN = STATE_HALF_OPEN
328+
createCircuitBreakerInterceptor.CircuitBreakerStorage = CircuitBreakerStorage
329+
330+
module.exports = createCircuitBreakerInterceptor

0 commit comments

Comments
 (0)