Skip to content

Commit 75c27bf

Browse files
authored
Merge pull request #567 from talex5/single-waiter
Move Waiters out of Eio_core
2 parents dcf8624 + d063513 commit 75c27bf

File tree

8 files changed

+53
-75
lines changed

8 files changed

+53
-75
lines changed

lib_eio/core/eio__core.ml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ module Private = struct
77
module Suspend = Suspend
88
module Cells = Cells
99
module Broadcast = Broadcast
10-
module Waiters = Waiters
1110
module Ctf = Ctf
1211
module Fiber_context = Cancel.Fiber_context
1312
module Debug = Debug

lib_eio/core/eio__core.mli

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -706,51 +706,6 @@ module Private : sig
706706
because you need to unlock a mutex if cancelled). *)
707707
end
708708

709-
(** A queue of fibers waiting for an event. *)
710-
module Waiters : sig
711-
type 'a t
712-
(* A queue of fibers waiting for something.
713-
Note: an [_ t] is not thread-safe itself.
714-
To use share it between domains, the user is responsible for wrapping it in a mutex. *)
715-
716-
val create : unit -> 'a t
717-
718-
val wake_all : 'a t -> 'a -> unit
719-
(** [wake_all t] calls (and removes) all the functions waiting on [t].
720-
If [t] is shared between domains, the caller must hold the mutex while calling this. *)
721-
722-
val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty]
723-
(** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue.
724-
If [t] is shared between domains, the caller must hold the mutex while calling this. *)
725-
726-
val is_empty : 'a t -> bool
727-
(** [is_empty t] checks whether there are any functions waiting on [t].
728-
If [t] is shared between domains, the caller must hold the mutex while calling this,
729-
and the result is valid until the mutex is released. *)
730-
731-
val await :
732-
mutex:Mutex.t option ->
733-
'a t -> Ctf.id -> 'a
734-
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
735-
When the waiter is woken, the fiber is resumed and returns the result.
736-
If [t] can be used from multiple domains:
737-
- [mutex] must be set to the mutex to use to unlock it.
738-
- [mutex] must be already held when calling this function, which will unlock it before blocking.
739-
When [await] returns, [mutex] will have been unlocked.
740-
@raise Cancel.Cancelled if the fiber's context is cancelled *)
741-
742-
val await_internal :
743-
mutex:Mutex.t option ->
744-
'a t -> Ctf.id -> Fiber_context.t ->
745-
(('a, exn) result -> unit) -> unit
746-
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
747-
This also allows wrapping the [enqueue] function.
748-
Calls [enqueue (Error (Cancelled _))] if cancelled.
749-
Note: [enqueue] is called from the triggering domain,
750-
which is currently calling {!wake_one} or {!wake_all}
751-
and must therefore be holding [mutex]. *)
752-
end
753-
754709
module Debug : sig
755710
val traceln :
756711
?__POS__:string * int * int * int ->

lib_eio/core/single_waiter.ml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
(* A simplified version of [Waiters] that can only handle one waiter and is not thread-safe. *)
1+
(* Allows a single fiber to wait to be notified by another fiber in the same domain.
2+
If multiple fibers need to wait at once, or the notification comes from another domain,
3+
this can't be used. *)
24

35
type 'a t = {
46
mutable wake : ('a, exn) result -> unit;

lib_eio/core/switch.ml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ type t = {
44
mutable daemon_fibers : int;
55
mutable exs : (exn * Printexc.raw_backtrace) option;
66
on_release : (unit -> unit) Lwt_dllist.t;
7-
waiter : unit Waiters.t; (* The main [top]/[sub] function may wait here for fibers to finish. *)
7+
waiter : unit Single_waiter.t; (* The main [top]/[sub] function may wait here for fibers to finish. *)
88
cancel : Cancel.t;
99
}
1010

@@ -68,7 +68,7 @@ let dec_fibers t =
6868
if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then
6969
Cancel.cancel t.cancel Exit;
7070
if t.fibers = 0 then
71-
Waiters.wake_all t.waiter ()
71+
Single_waiter.wake t.waiter (Ok ())
7272

7373
let with_op t fn =
7474
inc_fibers t;
@@ -92,7 +92,7 @@ let rec await_idle t =
9292
(* Wait for fibers to finish: *)
9393
while t.fibers > 0 do
9494
Ctf.note_try_read t.id;
95-
Waiters.await ~mutex:None t.waiter t.id
95+
Single_waiter.await t.waiter t.id
9696
done;
9797
(* Call on_release handlers: *)
9898
let queue = Lwt_dllist.create () in
@@ -125,7 +125,7 @@ let create cancel =
125125
fibers = 1; (* The main function counts as a fiber *)
126126
daemon_fibers = 0;
127127
exs = None;
128-
waiter = Waiters.create ();
128+
waiter = Single_waiter.create ();
129129
on_release = Lwt_dllist.create ();
130130
cancel;
131131
}

lib_eio/core/waiters.mli

Lines changed: 0 additions & 20 deletions
This file was deleted.
File renamed without changes.

lib_eio/core/waiters.ml renamed to lib_eio/waiters.ml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@ let rec wake_one t v =
3838

3939
let is_empty = Lwt_dllist.is_empty
4040

41-
let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
42-
match Cancel.Fiber_context.get_error ctx with
41+
let await_internal ~mutex (t:'a t) id ctx enqueue =
42+
match Fiber_context.get_error ctx with
4343
| Some ex ->
4444
Option.iter Mutex.unlock mutex;
4545
enqueue (Error ex)
4646
| None ->
4747
let resolved_waiter = ref Hook.null in
4848
let finished = Atomic.make false in
4949
let enqueue x =
50-
Ctf.note_read ~reader:id ctx.tid;
50+
Ctf.note_read ~reader:id (Fiber_context.tid ctx);
5151
enqueue x
5252
in
5353
let cancel ex =
@@ -56,7 +56,7 @@ let await_internal ~mutex (t:'a t) id (ctx:Cancel.fiber_context) enqueue =
5656
enqueue (Error ex)
5757
)
5858
in
59-
Cancel.Fiber_context.set_cancel_fn ctx cancel;
59+
Fiber_context.set_cancel_fn ctx cancel;
6060
let waiter = { enqueue; finished } in
6161
match mutex with
6262
| None ->

lib_eio/waiters.mli

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
(** A queue of fibers waiting for an event. *)
2+
type 'a t
3+
(* A queue of fibers waiting for something.
4+
Note: an [_ t] is not thread-safe itself.
5+
To use share it between domains, the user is responsible for wrapping it in a mutex. *)
6+
7+
val create : unit -> 'a t
8+
9+
val wake_all : 'a t -> 'a -> unit
10+
(** [wake_all t] calls (and removes) all the functions waiting on [t].
11+
If [t] is shared between domains, the caller must hold the mutex while calling this. *)
12+
13+
val wake_one : 'a t -> 'a -> [`Ok | `Queue_empty]
14+
(** [wake_one t] is like {!wake_all}, but only calls (and removes) the first waiter in the queue.
15+
If [t] is shared between domains, the caller must hold the mutex while calling this. *)
16+
17+
val is_empty : 'a t -> bool
18+
(** [is_empty t] checks whether there are any functions waiting on [t].
19+
If [t] is shared between domains, the caller must hold the mutex while calling this,
20+
and the result is valid until the mutex is released. *)
21+
22+
val await :
23+
mutex:Mutex.t option ->
24+
'a t -> Ctf.id -> 'a
25+
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
26+
When the waiter is woken, the fiber is resumed and returns the result.
27+
If [t] can be used from multiple domains:
28+
- [mutex] must be set to the mutex to use to unlock it.
29+
- [mutex] must be already held when calling this function, which will unlock it before blocking.
30+
When [await] returns, [mutex] will have been unlocked.
31+
@raise Cancel.Cancelled if the fiber's context is cancelled *)
32+
33+
val await_internal :
34+
mutex:Mutex.t option ->
35+
'a t -> Ctf.id -> Fiber_context.t ->
36+
(('a, exn) result -> unit) -> unit
37+
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
38+
This also allows wrapping the [enqueue] function.
39+
Calls [enqueue (Error (Cancelled _))] if cancelled.
40+
Note: [enqueue] is called from the triggering domain,
41+
which is currently calling {!wake_one} or {!wake_all}
42+
and must therefore be holding [mutex]. *)

0 commit comments

Comments
 (0)