Skip to content

Commit e2dc1d7

Browse files
authored
Merge pull request #753 from talex5/cancel-alloc-fixed
eio_linux: allow alloc_fixed_or_wait to be cancelled
2 parents 0f6b65d + 0c41432 commit e2dc1d7

File tree

7 files changed

+104
-34
lines changed

7 files changed

+104
-34
lines changed

lib_eio/core/fiber.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ module List = struct
226226

227227
let release t =
228228
t.free_fibers <- t.free_fibers + 1;
229-
if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ())
229+
if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond
230230

231231
let use t fn x =
232232
await_free t;

lib_eio/core/single_waiter.ml

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,32 @@
1-
(* Allows a single fiber to wait to be notified by another fiber in the same domain.
2-
If multiple fibers need to wait at once, or the notification comes from another domain,
3-
this can't be used. *)
1+
type 'a state =
2+
| Running
3+
| Sleeping of (('a, exn) result -> unit)
44

5-
type 'a t = {
6-
mutable wake : ('a, exn) result -> unit;
7-
}
5+
type 'a t = 'a state ref
86

9-
let create () = { wake = ignore }
7+
let create () = ref Running
108

11-
let wake t v = t.wake v
9+
let wake t v =
10+
match !t with
11+
| Running -> false
12+
| Sleeping fn ->
13+
t := Running;
14+
fn v;
15+
true
16+
17+
let wake_if_sleeping t =
18+
ignore (wake t (Ok ()) : bool)
1219

1320
let await t op id =
1421
let x =
1522
Suspend.enter op @@ fun ctx enqueue ->
1623
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
17-
t.wake <- ignore;
24+
t := Running;
1825
enqueue (Error ex)
1926
);
20-
t.wake <- (fun x ->
27+
t := Sleeping (fun x ->
2128
Cancel.Fiber_context.clear_cancel_fn ctx;
22-
t.wake <- ignore;
29+
t := Running;
2330
enqueue x
2431
)
2532
in
@@ -29,7 +36,7 @@ let await t op id =
2936
let await_protect t op id =
3037
let x =
3138
Suspend.enter_unchecked op @@ fun _ctx enqueue ->
32-
t.wake <- (fun x -> t.wake <- ignore; enqueue x)
39+
t := Sleeping (fun x -> t := Running; enqueue x)
3340
in
3441
Trace.get id;
3542
x

lib_eio/core/single_waiter.mli

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
(** Allows a single fiber to wait to be notified by another fiber in the same domain.
2+
If multiple fibers need to wait at once, or the notification comes from another domain,
3+
this can't be used. *)
4+
5+
type 'a t
6+
(** A handle representing a fiber that might be sleeping.
7+
It is either in the Running or Sleeping state. *)
8+
9+
val create : unit -> 'a t
10+
(** [create ()] is a new waiter, initially in the Running state. *)
11+
12+
val wake : 'a t -> ('a, exn) result -> bool
13+
(** [wake t v] resumes [t]'s fiber with value [v] and returns [true] if it was sleeping.
14+
If [t] is Running then this just returns [false]. *)
15+
16+
val wake_if_sleeping : unit t -> unit
17+
(** [wake_if_sleeping] is [ignore (wake t (Ok ()))]. *)
18+
19+
val await : 'a t -> string -> Trace.id -> 'a
20+
(** [await t op id] suspends the calling fiber, changing [t]'s state to Sleeping.
21+
If the fiber is cancelled, a cancel exception is raised.
22+
[op] and [id] are used for tracing. *)
23+
24+
val await_protect : 'a t -> string -> Trace.id -> 'a
25+
(** [await_protect] is like {!await}, but the sleep cannot be cancelled. *)

lib_eio/core/switch.ml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ let dec_fibers t =
7272
if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then
7373
Cancel.cancel t.cancel Exit;
7474
if t.fibers = 0 then
75-
Single_waiter.wake t.waiter (Ok ())
75+
Single_waiter.wake_if_sleeping t.waiter
7676

7777
let with_op t fn =
7878
inc_fibers t;

lib_eio_linux/low_level.ml

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,20 @@ let alloc_fixed_or_wait () =
226226
| exception Uring.Region.No_space ->
227227
let id = Eio.Private.Trace.mint_id () in
228228
let trigger = Eio.Private.Single_waiter.create () in
229-
Queue.push trigger s.mem_q;
230-
(* todo: remove protect; but needs to remove from queue on cancel *)
231-
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
232-
233-
let free_fixed buf =
229+
let node = Lwt_dllist.add_r trigger s.mem_q in
230+
try
231+
Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id
232+
with ex ->
233+
Lwt_dllist.remove node;
234+
raise ex
235+
236+
let rec free_fixed buf =
234237
let s = Sched.get () in
235-
match Queue.take_opt s.mem_q with
238+
match Lwt_dllist.take_opt_l s.mem_q with
236239
| None -> Uring.Region.free buf
237-
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
240+
| Some k ->
241+
if not (Eio.Private.Single_waiter.wake k (Ok buf)) then
242+
free_fixed buf (* [k] was already cancelled, but not yet removed from the queue *)
238243

239244
let splice src ~dst ~len =
240245
Fd.use_exn "splice-src" src @@ fun src ->

lib_eio_linux/sched.ml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type t = {
5050
uring: io_job Uring.t;
5151
mem: Uring.Region.t option;
5252
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
53-
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
53+
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t;
5454

5555
(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
5656
run_q : runnable Lf_queue.t;
@@ -247,7 +247,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
247247
) else if timeout = None && Uring.active_ops uring = 0 then (
248248
(* Nothing further can happen at this point.
249249
If there are no events in progress but also still no memory available, something has gone wrong! *)
250-
assert (Queue.length mem_q = 0);
250+
assert (Lwt_dllist.length mem_q = 0);
251251
Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
252252
`Exit_scheduler
253253
) else (
@@ -536,7 +536,7 @@ let with_sched ?(fallback=no_fallback) config fn =
536536
Lf_queue.push run_q IO;
537537
let sleep_q = Zzz.create () in
538538
let io_q = Queue.create () in
539-
let mem_q = Queue.create () in
539+
let mem_q = Lwt_dllist.create () in
540540
with_eventfd @@ fun eventfd ->
541541
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
542542
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }

lib_eio_linux/tests/test.ml

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -211,19 +211,52 @@ let test_signal_race () =
211211
(fun () -> Eio.Condition.await_no_mutex cond)
212212
(fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status))
213213

214+
let test_alloc_fixed_or_wait () =
215+
Eio_linux.run ~n_blocks:1 @@ fun _env ->
216+
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
217+
(* We have to wait for the block, but get cancelled while waiting. *)
218+
begin
219+
try
220+
Fiber.both
221+
(fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait () : Uring.Region.chunk))
222+
(fun () -> raise Exit);
223+
with Exit -> ()
224+
end;
225+
(* We have to wait for the block, and get it when the old one is freed. *)
226+
Fiber.both
227+
(fun () ->
228+
let x = Eio_linux.Low_level.alloc_fixed_or_wait () in
229+
Eio_linux.Low_level.free_fixed x
230+
)
231+
(fun () ->
232+
Eio_linux.Low_level.free_fixed block
233+
);
234+
(* The old block is passed to the waiting fiber, but it's cancelled. *)
235+
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
236+
Fiber.both
237+
(fun () ->
238+
Fiber.first
239+
(fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()); assert false)
240+
(fun () -> ())
241+
)
242+
(fun () -> Eio_linux.Low_level.free_fixed block);
243+
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
244+
Eio_linux.Low_level.free_fixed block
245+
214246
let () =
215247
let open Alcotest in
216248
run "eio_linux" [
217249
"io", [
218-
test_case "copy" `Quick test_copy;
219-
test_case "direct_copy" `Quick test_direct_copy;
220-
test_case "poll_add" `Quick test_poll_add;
221-
test_case "poll_add_busy" `Quick test_poll_add_busy;
222-
test_case "iovec" `Quick test_iovec;
223-
test_case "no_sqe" `Quick test_no_sqe;
224-
test_case "read_exact" `Quick test_read_exact;
225-
test_case "expose_backend" `Quick test_expose_backend;
226-
test_case "statx" `Quick test_statx;
227-
test_case "signal_race" `Quick test_signal_race;
250+
test_case "copy" `Quick test_copy;
251+
test_case "direct_copy" `Quick test_direct_copy;
252+
test_case "poll_add" `Quick test_poll_add;
253+
test_case "poll_add_busy" `Quick test_poll_add_busy;
254+
test_case "iovec" `Quick test_iovec;
255+
test_case "no_sqe" `Quick test_no_sqe;
256+
test_case "read_exact" `Quick test_read_exact;
257+
test_case "expose_backend" `Quick test_expose_backend;
258+
test_case "statx" `Quick test_statx;
259+
test_case "signal_race" `Quick test_signal_race;
260+
test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait;
228261
];
229262
]

0 commit comments

Comments
 (0)