Skip to content

Commit 0f6b65d

Browse files
authored
Merge pull request #752 from talex5/linux-get-sched
eio_linux: refactor fixed buffer code
2 parents d47b5e2 + cc2cd3d commit 0f6b65d

File tree

4 files changed

+36
-41
lines changed

4 files changed

+36
-41
lines changed

lib_eio/core/eio__core.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module Private = struct
77
module Suspend = Suspend
88
module Cells = Cells
99
module Broadcast = Broadcast
10+
module Single_waiter = Single_waiter
1011
module Trace = Trace
1112
module Fiber_context = Cancel.Fiber_context
1213
module Debug = Debug

lib_eio/core/eio__core.mli

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ module Private : sig
606606

607607
module Cells = Cells
608608
module Broadcast = Broadcast
609+
module Single_waiter = Single_waiter
609610

610611
(** Every fiber has an associated context. *)
611612
module Fiber_context : sig

lib_eio_linux/low_level.ml

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,34 @@ let write ?file_offset:off fd buf len =
207207
raise @@ Err.wrap (Uring.error_of_errno res) "write" ""
208208
)
209209

210-
let alloc_fixed () = Effect.perform Sched.Alloc
211-
212-
let alloc_fixed_or_wait () = Effect.perform Sched.Alloc_or_wait
213-
214-
let free_fixed buf = Effect.perform (Sched.Free buf)
210+
let alloc_fixed () =
211+
let s = Sched.get () in
212+
match s.mem with
213+
| None -> None
214+
| Some mem ->
215+
match Uring.Region.alloc mem with
216+
| buf -> Some buf
217+
| exception Uring.Region.No_space -> None
218+
219+
let alloc_fixed_or_wait () =
220+
let s = Sched.get () in
221+
match s.mem with
222+
| None -> failwith "No fixed buffer available"
223+
| Some mem ->
224+
match Uring.Region.alloc mem with
225+
| buf -> buf
226+
| exception Uring.Region.No_space ->
227+
let id = Eio.Private.Trace.mint_id () in
228+
let trigger = Eio.Private.Single_waiter.create () in
229+
Queue.push trigger s.mem_q;
230+
(* todo: remove protect; but needs to remove from queue on cancel *)
231+
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
232+
233+
let free_fixed buf =
234+
let s = Sched.get () in
235+
match Queue.take_opt s.mem_q with
236+
| None -> Uring.Region.free buf
237+
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
215238

216239
let splice src ~dst ~len =
217240
Fd.use_exn "splice-src" src @@ fun src ->

lib_eio_linux/sched.ml

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type t = {
5050
uring: io_job Uring.t;
5151
mem: Uring.Region.t option;
5252
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
53-
mem_q : Uring.Region.chunk Suspended.t Queue.t;
53+
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
5454

5555
(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
5656
run_q : runnable Lf_queue.t;
@@ -74,9 +74,9 @@ type t = {
7474
type _ Effect.t +=
7575
| Enter : (t -> 'a Suspended.t -> unit) -> 'a Effect.t
7676
| Cancel : io_job Uring.job -> unit Effect.t
77-
| Alloc : Uring.Region.chunk option Effect.t
78-
| Alloc_or_wait : Uring.Region.chunk Effect.t
79-
| Free : Uring.Region.chunk -> unit Effect.t
77+
| Get : t Effect.t
78+
79+
let get () = Effect.perform Get
8080

8181
let wake_buffer =
8282
let b = Bytes.create 8 in
@@ -339,21 +339,6 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res =
339339
| _, Exactly len -> Suspended.continue action len
340340
| n, Upto _ -> Suspended.continue action n
341341

342-
let alloc_buf_or_wait st k =
343-
match st.mem with
344-
| None -> Suspended.discontinue k (Failure "No fixed buffer available")
345-
| Some mem ->
346-
match Uring.Region.alloc mem with
347-
| buf -> Suspended.continue k buf
348-
| exception Uring.Region.No_space ->
349-
Queue.push k st.mem_q;
350-
schedule st
351-
352-
let free_buf st buf =
353-
match Queue.take_opt st.mem_q with
354-
| None -> Uring.Region.free buf
355-
| Some k -> enqueue_thread st k buf
356-
357342
let rec enqueue_poll_add fd poll_mask st action =
358343
Trace.log "poll_add";
359344
let retry = with_cancel_hook ~action st (fun () ->
@@ -411,8 +396,9 @@ let run ~extra_effects st main arg =
411396
Fiber_context.destroy fiber;
412397
Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
413398
);
414-
effc = fun (type a) (e : a Effect.t) ->
399+
effc = fun (type a) (e : a Effect.t) : ((a, _) continuation -> _) option ->
415400
match e with
401+
| Get -> Some (fun k -> continue k st)
416402
| Enter fn -> Some (fun k ->
417403
match Fiber_context.get_error fiber with
418404
| Some e -> discontinue k e
@@ -467,22 +453,6 @@ let run ~extra_effects st main arg =
467453
Eio_unix.Private.Thread_pool.submit st.thread_pool ~ctx:fiber ~enqueue fn;
468454
schedule st
469455
)
470-
| Alloc -> Some (fun k ->
471-
match st.mem with
472-
| None -> continue k None
473-
| Some mem ->
474-
match Uring.Region.alloc mem with
475-
| buf -> continue k (Some buf)
476-
| exception Uring.Region.No_space -> continue k None
477-
)
478-
| Alloc_or_wait -> Some (fun k ->
479-
let k = { Suspended.k; fiber } in
480-
alloc_buf_or_wait st k
481-
)
482-
| Free buf -> Some (fun k ->
483-
free_buf st buf;
484-
continue k ()
485-
)
486456
| e -> extra_effects.effc e
487457
}
488458
in

0 commit comments

Comments
 (0)