Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

(name mcp_protocol)

(version 0.12.4)
(version 0.13.0)

(generate_opam_files true)

Expand Down
4 changes: 2 additions & 2 deletions eio/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(library
(name mcp_protocol_eio)
(public_name mcp_protocol_eio)
(libraries mcp_protocol eio yojson)
(modules stdio_transport handler server client))
(libraries mcp_protocol eio unix yojson)
(modules stdio_transport handler server client time_compat resilience))
224 changes: 224 additions & 0 deletions eio/resilience.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
(** Resilience Module - Circuit Breaker, Retry, and Timeout patterns for MCP servers *)

(* ============================================ *)
(* Types & Logger Interface (DI) *)
(* ============================================ *)

type log_level = Debug | Info | Warn | Err

(** Logger function type - injected by caller *)
type logger = log_level -> string -> unit

(** No-op logger *)
let null_logger _ _ = ()

(* ============================================ *)
(* Retry Policy Configuration *)
(* ============================================ *)

type retry_policy = {
max_attempts: int;
initial_delay_ms: int;
max_delay_ms: int;
backoff_multiplier: float;
jitter: bool;
}

let default_policy = {
max_attempts = 3;
initial_delay_ms = 100;
max_delay_ms = 10000;
backoff_multiplier = 2.0;
jitter = true;
}

(* ============================================ *)
(* Circuit Breaker *)
(* ============================================ *)

type circuit_state =
| Closed (* Normal operation *)
| Open (* Failing, reject requests *)
| HalfOpen (* Testing if service recovered *)

type circuit_breaker = {
name: string;
failure_threshold: int;
success_threshold: int;
timeout_ms: int;
mutable state: circuit_state;
mutable failure_count: int;
mutable success_count: int;
mutable last_failure_time: float;
mutable probe_in_progress: bool;
mutex: Eio.Mutex.t;
logger: logger;
}

let create_circuit_breaker
?(failure_threshold=5)
?(success_threshold=2)
?(timeout_ms=30000)
?(logger=null_logger)
~name
() =
{
name;
failure_threshold;
success_threshold;
timeout_ms;
state = Closed;
failure_count = 0;
success_count = 0;
last_failure_time = 0.0;
probe_in_progress = false;
mutex = Eio.Mutex.create ();
logger;
}

let circuit_allows cb =
Eio.Mutex.use_rw ~protect:true cb.mutex (fun () ->
match cb.state with
| Closed -> true
| Open ->
let now = Time_compat.now () in
let elapsed_ms = (now -. cb.last_failure_time) *. 1000.0 in
if elapsed_ms >= float_of_int cb.timeout_ms then begin
cb.state <- HalfOpen;
cb.success_count <- 0;
cb.probe_in_progress <- true;
cb.logger Debug (Printf.sprintf "Circuit '%s' entering HalfOpen state" cb.name);
true
end else
false
| HalfOpen ->
if cb.probe_in_progress then false
else begin
cb.probe_in_progress <- true;
true
end
)

let circuit_record_success cb =
Eio.Mutex.use_rw ~protect:true cb.mutex (fun () ->
match cb.state with
| Closed ->
cb.failure_count <- 0
| HalfOpen ->
cb.probe_in_progress <- false;
cb.success_count <- cb.success_count + 1;
if cb.success_count >= cb.success_threshold then begin
cb.state <- Closed;
cb.failure_count <- 0;
cb.success_count <- 0;
cb.logger Info (Printf.sprintf "Circuit '%s' closed (recovered)" cb.name)
end
| Open -> ()
)

let circuit_record_failure cb =
Eio.Mutex.use_rw ~protect:true cb.mutex (fun () ->
cb.last_failure_time <- Time_compat.now ();
match cb.state with
| Closed ->
cb.failure_count <- cb.failure_count + 1;
if cb.failure_count >= cb.failure_threshold then begin
cb.state <- Open;
cb.logger Warn (Printf.sprintf "Circuit '%s' opened after %d failures" cb.name cb.failure_count)
end
| HalfOpen ->
cb.probe_in_progress <- false;
cb.state <- Open;
cb.success_count <- 0;
cb.logger Warn (Printf.sprintf "Circuit '%s' reopened during probe" cb.name)
| Open -> ()
)

(* ============================================ *)
(* Retry Logic (Pure & Eio) *)
(* ============================================ *)

type 'a retry_result =
| Ok of 'a
| Error of string
| CircuitOpen
| TimedOut

(** Retry action classification *)
type retry_action =
| Retry
| Fail of string

let calculate_delay policy attempt =
let base_delay = float_of_int policy.initial_delay_ms in
let multiplied = base_delay *. (policy.backoff_multiplier ** float_of_int (attempt - 1)) in
let capped = min multiplied (float_of_int policy.max_delay_ms) in
if policy.jitter then
let jitter_factor = 0.75 +. (Random.float 0.5) in
capped *. jitter_factor
else
capped

(** Eio-based retry with structured error classification.
@param classify Function that maps domain errors ('e) to retry actions *)
let with_retry_eio
~clock
?(policy=default_policy)
?(circuit_breaker=None)
?(logger=null_logger)
~op_name
~classify
f =
let rec attempt n last_error =
let cb_allows = match circuit_breaker with
| None -> true
| Some cb -> circuit_allows cb
in
if not cb_allows then begin
logger Warn (Printf.sprintf "%s: circuit breaker OPEN, rejecting" op_name);
CircuitOpen
end
else if n > policy.max_attempts then begin
Error (match last_error with Some e -> e | None -> "Max attempts reached")
end
else begin
if n > 1 then begin
let delay_ms = calculate_delay policy (n - 1) in
logger Debug (Printf.sprintf "%s: retrying in %.0fms (attempt %d)" op_name delay_ms n);
Eio.Time.sleep clock (delay_ms /. 1000.0)
end;

match f () with
| Ok v ->
(match circuit_breaker with Some cb -> circuit_record_success cb | None -> ());
Ok v
| Error e ->
(match classify e with
| Fail msg ->
(match circuit_breaker with Some cb -> circuit_record_failure cb | None -> ());
Error msg
| Retry ->
(match circuit_breaker with Some cb -> circuit_record_failure cb | None -> ());
attempt (n + 1) (Some (match classify e with Fail m -> m | Retry -> "Retryable error")))
| CircuitOpen -> CircuitOpen
| TimedOut -> TimedOut
Comment on lines +203 to +204

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Record circuit failure on TimedOut/CircuitOpen outcomes

When retry runs with a circuit breaker in HalfOpen, circuit_allows sets probe_in_progress <- true before calling f. If f returns TimedOut or CircuitOpen, this branch exits immediately without circuit_record_failure or circuit_record_success, so the probe flag is never cleared and subsequent circuit_allows checks in HalfOpen keep rejecting requests, effectively wedging the breaker.

Useful? React with 👍 / 👎.

end
in
attempt 1 None

(** Eio-based timeout wrapper using Fiber.first *)
let with_timeout_eio ~clock ~timeout_ms f =
let timeout_sec = float_of_int timeout_ms /. 1000.0 in
let result =
Eio.Fiber.first
(fun () ->
try
Eio.Time.sleep clock timeout_sec;
Error "Timeout"
with Eio.Cancel.Cancelled _ -> Error "Cancelled")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Let cancellation propagate from timeout sentinel fiber

Catching Eio.Cancel.Cancelled here converts cancellation into a normal Error path, and the outer match then maps it to Error "Timeout". Under parent-fiber cancellation (e.g., shutdown/abort), this can mask cooperative cancellation as a timeout result instead of propagating cancellation upstream, which breaks expected Eio cancellation behavior.

Useful? React with 👍 / 👎.

(fun () -> Ok (f ()))
in
match result with
| Ok res -> Ok res
| Error _ -> Error "Timeout"
| _ -> Error "Unknown error"
108 changes: 108 additions & 0 deletions eio/resilience.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
(** Resilience Module - Circuit Breaker, Retry, and Timeout patterns for MCP servers.

Provides three core patterns:
- {b Circuit Breaker}: Prevents cascading failures by short-circuiting
calls to failing services.
- {b Retry with Backoff}: Retries transient failures with exponential
backoff and optional jitter.
- {b Timeout}: Wraps operations with Eio-based timeouts using [Fiber.first].
*)

(** {1 Logger Interface} *)

type log_level = Debug | Info | Warn | Err

type logger = log_level -> string -> unit
(** Logger function type. Injected by caller for dependency inversion. *)

val null_logger : logger
(** No-op logger that discards all messages. *)

(** {1 Retry Policy} *)

type retry_policy = {
max_attempts: int;
initial_delay_ms: int;
max_delay_ms: int;
backoff_multiplier: float;
jitter: bool;
}

val default_policy : retry_policy
(** Default: 3 attempts, 100ms initial delay, 10s max, 2x backoff, jitter on. *)

(** {1 Circuit Breaker} *)

type circuit_state =
| Closed (** Normal operation - requests pass through. *)
| Open (** Failing - requests are rejected immediately. *)
| HalfOpen (** Testing recovery - one probe request allowed. *)

type circuit_breaker
(** Opaque circuit breaker handle. Create with {!create_circuit_breaker}. *)

val create_circuit_breaker :
?failure_threshold:int ->
?success_threshold:int ->
?timeout_ms:int ->
?logger:logger ->
name:string ->
unit ->
circuit_breaker
(** Create a circuit breaker.
@param failure_threshold Failures before opening (default 5).
@param success_threshold Successes in HalfOpen before closing (default 2).
@param timeout_ms Time in Open state before probing (default 30000).
@param name Identifier for logging. *)

val circuit_allows : circuit_breaker -> bool
(** Check if the circuit breaker allows a request. Transitions Open to HalfOpen
when timeout has elapsed. Thread-safe (uses Eio.Mutex). *)

val circuit_record_success : circuit_breaker -> unit
(** Record a successful call. In HalfOpen, may transition to Closed. *)

val circuit_record_failure : circuit_breaker -> unit
(** Record a failed call. In Closed, may transition to Open.
Uses {!Time_compat.now} for timestamps. *)

(** {1 Retry} *)

type 'a retry_result =
| Ok of 'a
| Error of string
| CircuitOpen
| TimedOut

type retry_action =
| Retry (** The error is transient; retry the operation. *)
| Fail of string (** The error is permanent; stop with this message. *)

val calculate_delay : retry_policy -> int -> float
(** [calculate_delay policy attempt] returns delay in milliseconds for the
given attempt number. Applies exponential backoff, cap, and optional jitter. *)

val with_retry_eio :
clock:_ Eio.Time.clock ->
?policy:retry_policy ->
?circuit_breaker:circuit_breaker option ->
?logger:logger ->
op_name:string ->
classify:(string -> retry_action) ->
(unit -> 'a retry_result) ->
'a retry_result
(** Eio-based retry with structured error classification.
@param clock Eio clock for sleep between retries.
@param classify Maps error strings to {!retry_action}.
@param op_name Operation name for log messages. *)

(** {1 Timeout} *)

val with_timeout_eio :
clock:_ Eio.Time.clock ->
timeout_ms:int ->
(unit -> 'a) ->
'a retry_result
(** [with_timeout_eio ~clock ~timeout_ms f] runs [f ()] with a timeout.
Returns [Ok result] on success or [Error "Timeout"] if the deadline passes.
Uses [Eio.Fiber.first] for cooperative cancellation. *)
Loading
Loading