Skip to content

Commit b256b4c

Browse files
feat(async/unstable): support sync functions in CircuitBreaker (#6933)
1 parent 37e5ec1 commit b256b4c

File tree

2 files changed

+146
-111
lines changed

2 files changed

+146
-111
lines changed

async/unstable_circuit_breaker.ts

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ export interface CircuitBreakerOptions<T> {
103103
/** Statistics returned by {@linkcode CircuitBreaker.getStats}. */
104104
export interface CircuitBreakerStats {
105105
/** Current state of the circuit breaker. */
106-
state: CircuitState;
106+
readonly state: CircuitState;
107107
/** Number of failures in the current window. */
108-
failureCount: number;
108+
readonly failureCount: number;
109109
/** Number of consecutive successes (relevant in half-open state). */
110-
consecutiveSuccesses: number;
110+
readonly consecutiveSuccesses: number;
111111
/** Whether the circuit is currently allowing requests. */
112-
isAvailable: boolean;
112+
readonly isAvailable: boolean;
113113
}
114114

115115
/**
@@ -311,29 +311,29 @@ export class CircuitBreaker<T = unknown> {
311311
onClose,
312312
} = options;
313313

314-
if (failureThreshold < 1) {
314+
if (!Number.isFinite(failureThreshold) || failureThreshold < 1) {
315315
throw new TypeError(
316-
`Cannot create circuit breaker as 'failureThreshold' must be at least 1: current value is ${failureThreshold}`,
316+
`Cannot create circuit breaker as 'failureThreshold' must be a finite number >= 1: received ${failureThreshold}`,
317317
);
318318
}
319-
if (cooldownMs < 0) {
319+
if (!Number.isFinite(cooldownMs) || cooldownMs < 0) {
320320
throw new TypeError(
321-
`Cannot create circuit breaker as 'cooldownMs' must be non-negative: current value is ${cooldownMs}`,
321+
`Cannot create circuit breaker as 'cooldownMs' must be a finite non-negative number: received ${cooldownMs}`,
322322
);
323323
}
324-
if (successThreshold < 1) {
324+
if (!Number.isFinite(successThreshold) || successThreshold < 1) {
325325
throw new TypeError(
326-
`Cannot create circuit breaker as 'successThreshold' must be at least 1: current value is ${successThreshold}`,
326+
`Cannot create circuit breaker as 'successThreshold' must be a finite number >= 1: received ${successThreshold}`,
327327
);
328328
}
329-
if (halfOpenMaxConcurrent < 1) {
329+
if (!Number.isFinite(halfOpenMaxConcurrent) || halfOpenMaxConcurrent < 1) {
330330
throw new TypeError(
331-
`Cannot create circuit breaker as 'halfOpenMaxConcurrent' must be at least 1: current value is ${halfOpenMaxConcurrent}`,
331+
`Cannot create circuit breaker as 'halfOpenMaxConcurrent' must be a finite number >= 1: received ${halfOpenMaxConcurrent}`,
332332
);
333333
}
334-
if (failureWindowMs < 0) {
334+
if (!Number.isFinite(failureWindowMs) || failureWindowMs < 0) {
335335
throw new TypeError(
336-
`Cannot create circuit breaker as 'failureWindowMs' must be non-negative: current value is ${failureWindowMs}`,
336+
`Cannot create circuit breaker as 'failureWindowMs' must be a finite non-negative number: received ${failureWindowMs}`,
337337
);
338338
}
339339

@@ -343,7 +343,7 @@ export class CircuitBreaker<T = unknown> {
343343
this.#halfOpenMaxConcurrent = halfOpenMaxConcurrent;
344344
this.#failureWindowMs = failureWindowMs;
345345
this.#isFailure = isFailure;
346-
this.#isResultFailure = isResultFailure as (result: T) => boolean;
346+
this.#isResultFailure = isResultFailure;
347347
this.#onStateChange = onStateChange;
348348
this.#onFailure = onFailure;
349349
this.#onOpen = onOpen;
@@ -423,9 +423,12 @@ export class CircuitBreaker<T = unknown> {
423423
}
424424

425425
/**
426-
* Executes an async operation through the circuit breaker.
426+
* Executes a function through the circuit breaker.
427427
*
428-
* @example Usage
428+
* The function can be synchronous or asynchronous. The result is always
429+
* returned as a promise.
430+
*
431+
* @example Usage with async function
429432
* ```ts
430433
* import { CircuitBreaker } from "@std/async/unstable-circuit-breaker";
431434
* import { assertEquals } from "@std/assert";
@@ -435,36 +438,24 @@ export class CircuitBreaker<T = unknown> {
435438
* assertEquals(result, "success");
436439
* ```
437440
*
438-
* @typeParam R The return type of the function, must extend T.
439-
* @param fn The async operation to execute.
440-
* @returns The result of the operation.
441-
* @throws {CircuitBreakerOpenError} If circuit is open.
442-
*/
443-
/*
444-
* NOTE: Known race condition in half-open state concurrent tracking.
445-
*
446-
* The `halfOpenInFlight` counter uses a read-modify-write pattern that is
447-
* not atomic. Under high concurrency, more requests than `halfOpenMaxConcurrent`
448-
* may slip through.
449-
*
450-
* Future fix: Once `@std/async/unstable-semaphore` stabilizes, use it to
451-
* guard state transitions:
452-
*
441+
* @example Usage with sync function
453442
* ```ts
454-
* import { Semaphore } from "@std/async/semaphore";
455-
*
456-
* #stateMutex = new Semaphore(1);
443+
* import { CircuitBreaker } from "@std/async/unstable-circuit-breaker";
444+
* import { assertEquals } from "@std/assert";
457445
*
458-
* async execute<R extends T>(fn: () => Promise<R>): Promise<R> {
459-
* {
460-
* using _lock = await this.#stateMutex.acquire();
461-
* // Check state and acquire half-open slot atomically
462-
* }
463-
* // Execute fn() outside the lock
464-
* }
446+
* const breaker = new CircuitBreaker({ failureThreshold: 5 });
447+
* const result = await breaker.execute(() => "sync result");
448+
* assertEquals(result, "sync result");
465449
* ```
450+
*
451+
* @typeParam R The return type of the function, must extend T.
452+
* @param fn The function to execute (sync or async).
453+
* @returns A promise that resolves to the result of the operation.
454+
* @throws {CircuitBreakerOpenError} If circuit is open.
466455
*/
467-
async execute<R extends T>(fn: () => Promise<R>): Promise<R> {
456+
async execute<R extends T>(
457+
fn: (() => Promise<R>) | (() => R),
458+
): Promise<R> {
468459
const currentTime = Date.now();
469460
const currentState = this.#resolveCurrentState();
470461

0 commit comments

Comments
 (0)