Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,16 @@ We can now ask Linux to suspend the process until a result (Completion Queue Ent
let rec wait_with_retry uring =
match Uring.wait uring with
| None -> wait_with_retry uring (* Interrupted *)
| Some { result; data } -> result, data;;
| Unit { result = (); data } -> Either.Left 0, data
| Int { result; data } -> Either.Left result, data
| FD { result; data } -> Either.Right result, data
| Error { result; data } -> failwith ("Unexpected error: " ^ Unix.error_message result);;
```

<!-- $MDX non-deterministic=output -->
```ocaml
# let result, data = wait_with_retry uring;;
val result : int = 8
val result : (int, Unix.file_descr) Either.t = Right <abstr>
val data : _[> `Open_log ] = `Open_log
```

Expand All @@ -76,9 +79,7 @@ The `result` field is the return code,
with the same meaning as the return code from the corresponding system call (`openat2` in this case).

```ocaml
# let fd =
if result < 0 then failwith ("Error: " ^ string_of_int result);
(Obj.magic result : Unix.file_descr);;
# let fd = Either.fold ~left:(fun _ -> assert false) ~right:Fun.id result
val fd : Unix.file_descr = <abstr>
```

Expand All @@ -97,11 +98,12 @@ let rec write_all fd = function
|> Option.get (* We know we have enough space here *)
in
assert (Uring.submit uring = 1);
let result, data = wait_with_retry uring in
assert (data = `Write_all); (* There aren't any other requests pending *)
assert (result > 0); (* Check for error return *)
let bufs = Cstruct.shiftv bufs result in
write_all fd bufs
match wait_with_retry uring with
| Either.Left result, `Write_all ->
let bufs = Cstruct.shiftv bufs result in
write_all fd bufs
| _ ->
assert false (* There aren't any other requests pending *)
```

```ocaml
Expand All @@ -124,8 +126,9 @@ Some <abstr>
- : int = 1

# wait_with_retry uring;;
- : int * ([> `Close_log | `Open_log | `Write_all ] as '_weak3) =
(0, `Close_log)
- : (int, Unix.file_descr) Either.t *
([> `Close_log | `Open_log | `Write_all ] as '_weak3)
= (Either.Left 0, `Close_log)
```

The file has now been written:
Expand Down
25 changes: 11 additions & 14 deletions bench/readv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ let buffer_size = 100 (* Use a small buffer to stress the system more *)
let n_concurrent = 16 (* How many requests to have active at once *)
let n_iters = 1_000_000 (* How many times to accept and resubmit *)

let rec wait t handle =
match Uring.get_cqe_nonblocking t with
| Some { result; data = buf } -> handle result buf
| None ->
match Uring.wait t with
| None -> wait t handle
| Some { result; data = buf } -> handle result buf
let rec process wait t handle = function
| Uring.Int { result; data = buf } -> handle result buf
| Error { result; data = _ } ->
raise (Unix.Unix_error (result, "readv", ""))
| FD _ | Unit _ -> failwith "Unexpected return from readv"
| None -> process wait t handle (wait t)

let wait t handle = process Uring.wait t handle (Uring.get_cqe_nonblocking t)

let run_bechmark ~polling_timeout fd =
let got = ref 0 in
Expand All @@ -29,13 +30,9 @@ let run_bechmark ~polling_timeout fd =
let t0 = Unix.gettimeofday () in
for _ = 1 to n_iters do
wait t (fun result bufs ->
if result < 0 then (
raise (Unix.Unix_error (Uring.error_of_errno result, "readv", ""))
) else (
got := !got + result;
let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in
()
)
got := !got + result;
let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in
()
)
done;
(* Get a snapshot of the stats before letting things finish. *)
Expand Down
119 changes: 66 additions & 53 deletions lib/uring/uring.ml
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,6 @@ module Msghdr = struct
create_with_addr ~n_fds ~fds:[] ?addr buffs
end

type 'a job = 'a Heap.entry

type clock = Boottime | Realtime

type probe
Expand All @@ -316,38 +314,41 @@ module Uring = struct
external opcode_supported : probe -> Op.t -> bool = "ocaml_uring_opcode_supported" [@@noalloc]

type id = Heap.ptr
type job = private int

type offset = Optint.Int63.t
external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc]
external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc]
external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc]
external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc]
external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc]
external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc]
external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc]
external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc]
external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc]
external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc]
external submit_statx : t -> id -> Unix.file_descr -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc]
external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc]
external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_bind" [@@noalloc]
external submit_listen : t -> id -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_listen" [@@noalloc]
external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_connect" [@@noalloc]
external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_accept" [@@noalloc]
external submit_cancel : t -> id -> id -> bool = "ocaml_uring_submit_cancel" [@@noalloc]
external submit_openat2 : t -> id -> Unix.file_descr -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc]
external submit_linkat : t -> id -> Unix.file_descr -> Sketch.ptr -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc]
external submit_unlinkat : t -> id -> Unix.file_descr -> Sketch.ptr -> bool -> bool = "ocaml_uring_submit_unlinkat" [@@noalloc]
external submit_mkdirat : t -> id -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_mkdirat" [@@noalloc]
external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_send_msg" [@@noalloc]
external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_recv_msg" [@@noalloc]
external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fsync" [@@noalloc]
external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fdatasync" [@@noalloc]
external submit_nop : t -> id -> job = "ocaml_uring_submit_nop" [@@noalloc]
external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> job = "ocaml_uring_submit_timeout" [@@noalloc]
external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> job = "ocaml_uring_submit_poll_add" [@@noalloc]
external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_read" [@@noalloc]
external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_write" [@@noalloc]
external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_readv" [@@noalloc]
external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_writev" [@@noalloc]
external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc]
external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc]
external submit_close : t -> Unix.file_descr -> id -> job = "ocaml_uring_submit_close" [@@noalloc]
external submit_statx : t -> id -> Unix.file_descr option -> Statx.t -> Sketch.ptr -> int -> int -> job = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc]
external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> job = "ocaml_uring_submit_splice" [@@noalloc]
external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_bind" [@@noalloc]
external submit_listen : t -> id -> Unix.file_descr -> int -> job = "ocaml_uring_submit_listen" [@@noalloc]
external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_connect" [@@noalloc]
external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_accept" [@@noalloc]
external submit_cancel : t -> id -> job -> job = "ocaml_uring_submit_cancel" [@@noalloc]
external submit_openat2 : t -> id -> Unix.file_descr option -> Open_how.t -> job = "ocaml_uring_submit_openat2" [@@noalloc]
external submit_linkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc]
external submit_unlinkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> bool -> job = "ocaml_uring_submit_unlinkat" [@@noalloc]
external submit_mkdirat : t -> id -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_mkdirat" [@@noalloc]
external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_send_msg" [@@noalloc]
external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_recv_msg" [@@noalloc]
external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fsync" [@@noalloc]
external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fdatasync" [@@noalloc]

type cqe_option = private
| Cqe_none
| Cqe_some of { user_data_id : id; res: int }
[@@ocaml.warning "-37" (* Avoids "Unused constructor" warning on OCaml <= 4.09. *)]
| Cqe_unit of { user_data_id : id; result: unit }
| Cqe_int of { user_data_id : id; result: int }
| Cqe_fd of { user_data_id : id; result: Unix.file_descr }
| Cqe_error of { user_data_id : id; result: Unix.error }

external wait_cqe : t -> cqe_option = "ocaml_uring_wait_cqe"
external wait_cqe_timeout : float -> t -> cqe_option = "ocaml_uring_wait_cqe_timeout"
Expand All @@ -357,6 +358,8 @@ module Uring = struct
external register_eventfd : t -> Unix.file_descr -> unit = "ocaml_uring_register_eventfd"
end

type 'a job = 'a Heap.entry * Uring.job

type 'a t = {
id : < >;
uring: Uring.t;
Expand Down Expand Up @@ -439,19 +442,19 @@ let exit t =
Uring.exit t.uring;
unregister_gc_root t

let with_id_full : type a. a t -> (Heap.ptr -> bool) -> a -> extra_data:'b -> a job option =
let with_id_full : type a. a t -> (Heap.ptr -> Uring.job) -> a -> extra_data:'b -> a job option =
fun t fn datum ~extra_data ->
match Heap.alloc t.data datum ~extra_data with
| exception (Invalid_argument _ as ex) -> check t; raise ex
| entry ->
let ptr = Heap.ptr entry in
let has_space = fn ptr in
if has_space then
Some entry
else (
ignore (Heap.free t.data ptr : a);
None
)
| exception (Invalid_argument _ as ex) -> check t; raise ex
| entry ->
let ptr = Heap.ptr entry in
let job = fn ptr in
if (job :> int) >= 0 then
Some (entry, job)
else (
ignore (Heap.free t.data ptr : a);
None
)

let with_id t fn a = with_id_full t fn a ~extra_data:()

Expand All @@ -465,9 +468,7 @@ let timeout ?(absolute = false) t clock timeout_ns user_data =
set_timespec timespec_ptr timeout_ns;
with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) user_data

let at_fdcwd : Unix.file_descr = Obj.magic Config.at_fdcwd

let openat2 t ~access ~flags ~perm ~resolve ?(fd=at_fdcwd) path user_data =
let openat2 t ~access ~flags ~perm ~resolve ?fd path user_data =
let open_flags = flags lor match access with
| `R -> Open_flags.rdonly
| `W -> Open_flags.wronly
Expand All @@ -482,20 +483,20 @@ module Linkat_flags = struct
let symlink_follow = Config.At.symlink_follow
end

let linkat t ?(old_dir_fd=at_fdcwd) ?(new_dir_fd=at_fdcwd) ~flags ~old_path ~new_path user_data =
let linkat t ?old_dir_fd ?new_dir_fd ~flags ~old_path ~new_path user_data =
with_id t (fun id ->
let old_path_buf = Sketch.String.alloc t.sketch old_path in
let new_path_buf = Sketch.String.alloc t.sketch new_path in
Uring.submit_linkat t.uring id old_dir_fd old_path_buf new_dir_fd new_path_buf flags
) user_data

let unlink t ~dir ?(fd=at_fdcwd) path user_data =
let unlink t ~dir ?fd path user_data =
with_id t (fun id ->
let buf = Sketch.String.alloc t.sketch path in
Uring.submit_unlinkat t.uring id fd buf dir
) user_data

let mkdirat t ~mode ?(fd=at_fdcwd) path user_data =
let mkdirat t ~mode ?fd path user_data =
with_id t (fun id ->
let buf = Sketch.String.alloc t.sketch path in
Uring.submit_mkdirat t.uring id fd buf mode
Expand Down Expand Up @@ -541,7 +542,7 @@ let poll_add t fd poll_mask user_data =
let close t fd user_data =
with_id t (fun id -> Uring.submit_close t.uring fd id) user_data

let statx t ?(fd=at_fdcwd) ~mask path statx flags user_data =
let statx t ?fd ~mask path statx flags user_data =
let spath = Sketch.String.alloc t.sketch path in
with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) user_data ~extra_data:statx

Expand Down Expand Up @@ -584,9 +585,9 @@ let fsync t ?(off=0L) ?(len=0) fd user_data =
let fdatasync t ?(off=0L) ?(len=0) fd user_data =
with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) user_data

let cancel t job user_data =
ignore (Heap.ptr job : Uring.id); (* Check it's still valid *)
with_id t (fun id -> Uring.submit_cancel t.uring id (Heap.ptr job)) user_data
let cancel t (entry, job) user_data =
ignore (Heap.ptr entry : Uring.id); (* Check it's still valid *)
with_id t (fun id -> Uring.submit_cancel t.uring id job) user_data

let sqe_ready t = Uring.sq_ready t.uring

Expand All @@ -613,14 +614,26 @@ let submit t =

type 'a completion_option =
| None
| Some of { result: int; data: 'a }
| Unit of { result: unit; data: 'a }
| Int of { result: int; data: 'a }
| FD of { result: Unix.file_descr; data: 'a }
| Error of { result: Unix.error; data: 'a }

let fn_on_ring fn t =
match fn t.uring with
| Uring.Cqe_none -> None
| Uring.Cqe_some { user_data_id; res } ->
| Uring.Cqe_unit { user_data_id; result } ->
let data = Heap.free t.data user_data_id in
Unit { result; data }
| Uring.Cqe_int { user_data_id; result } ->
let data = Heap.free t.data user_data_id in
Int { result; data }
| Uring.Cqe_fd { user_data_id; result } ->
let data = Heap.free t.data user_data_id in
FD { result; data }
| Uring.Cqe_error { user_data_id; result } ->
let data = Heap.free t.data user_data_id in
Some { result = res; data }
Error { result; data }

let get_cqe_nonblocking t =
check t;
Expand Down
9 changes: 6 additions & 3 deletions lib/uring/uring.mli
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,14 @@ val submit : 'a t -> int

type 'a completion_option =
| None
| Some of { result: int; data: 'a } (**)
| Unit of { result: unit; data: 'a }
| Int of { result: int; data: 'a }
| FD of { result: Unix.file_descr; data: 'a }
| Error of { result: Unix.error; data: 'a } (**)
(** The type of results of calling {!wait} and {!peek}. [None] denotes that
either there were no completions in the queue or an interrupt / timeout
occurred. [Some] contains both the user data attached to the completed
request and the integer syscall result. *)
occurred. The other constructors contain both the user data attached to the
completed request and syscall result. *)

val wait : ?timeout:float -> 'a t -> 'a completion_option
(** [wait ?timeout t] will block indefinitely (the default) or for [timeout]
Expand Down
Loading