From 49bfbe01e4ca77eb5e14ed021c73069249789123 Mon Sep 17 00:00:00 2001 From: David Allsopp Date: Wed, 1 Oct 2025 06:58:36 +0100 Subject: [PATCH 1/2] Not entirely satisfactory approach --- README.md | 24 ++++----- bench/readv.ml | 6 ++- lib/uring/heap.ml | 10 ++-- lib/uring/heap.mli | 6 ++- lib/uring/uring.ml | 112 ++++++++++++++++++++++------------------ lib/uring/uring.mli | 9 +++- lib/uring/uring_stubs.c | 44 ++++++++++++++-- tests/heap.md | 18 +++---- tests/main.md | 59 +++++++++++---------- tests/poll_add.ml | 3 +- tests/sketch.md | 3 +- tests/socket_ops.ml | 17 ++++-- tests/urcat.ml | 3 +- tests/urcp_fixed_lib.ml | 12 +++-- tests/urcp_lib.ml | 12 +++-- tests/urstat.ml | 4 +- 16 files changed, 209 insertions(+), 133 deletions(-) diff --git a/README.md b/README.md index 2888173c..4f7250d4 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,16 @@ We can now ask Linux to suspend the process until a result (Completion Queue Ent let rec wait_with_retry uring = match Uring.wait uring with | None -> wait_with_retry uring (* Interrupted *) - | Some { result; data } -> result, data;; + | Some { result; kind = Uring.Int; data } -> None, (result : int), data + | Some { result; kind = Uring.FD; data } -> Some (result : Unix.file_descr), 0, data + | Some { result; kind = Uring.Error; data } -> failwith (Unix.error_message result) ``` ```ocaml -# let result, data = wait_with_retry uring;; -val result : int = 8 +# let fd, result, data = wait_with_retry uring;; +val fd : Unix.file_descr option = Some +val result : int = 0 val data : _[> `Open_log ] = `Open_log ``` @@ -75,16 +78,10 @@ The `data` field is the data we passed in when submitting the request, allowing The `result` field is the return code, with the same meaning as the return code from the corresponding system call (`openat2` in this case). -```ocaml -# let fd = - if result < 0 then failwith ("Error: " ^ string_of_int result); - (Obj.magic result : Unix.file_descr);; -val fd : Unix.file_descr = -``` - We can now submit further requests. e.g. ```ocaml +let fd = Option.get fd let rec write_all fd = function | [] -> () | bufs -> @@ -97,7 +94,7 @@ let rec write_all fd = function |> Option.get (* We know we have enough space here *) in assert (Uring.submit uring = 1); - let result, data = wait_with_retry uring in + let _, result, data = wait_with_retry uring in assert (data = `Write_all); (* There aren't any other requests pending *) assert (result > 0); (* Check for error return *) let bufs = Cstruct.shiftv bufs result in @@ -124,8 +121,9 @@ Some - : int = 1 # wait_with_retry uring;; -- : int * ([> `Close_log | `Open_log | `Write_all ] as '_weak3) = -(0, `Close_log) +- : Unix.file_descr option * int * + ([> `Close_log | `Open_log | `Write_all ] as '_weak3) += (None, 0, `Close_log) ``` The file has now been written: diff --git a/bench/readv.ml b/bench/readv.ml index 8f977c7d..0529b4df 100644 --- a/bench/readv.ml +++ b/bench/readv.ml @@ -7,11 +7,13 @@ let n_iters = 1_000_000 (* How many times to accept and resubmit *) let rec wait t handle = match Uring.get_cqe_nonblocking t with - | Some { result; data = buf } -> handle result buf + | Some { result; kind = Uring.Int; data = buf } -> handle (result : int) buf + | Some _ -> assert false | None -> match Uring.wait t with | None -> wait t handle - | Some { result; data = buf } -> handle result buf + | Some { result; kind = Uring.Int; data = buf } -> handle (result : int) buf + | Some _ -> assert false let run_bechmark ~polling_timeout fd = let got = ref 0 in diff --git a/lib/uring/heap.ml b/lib/uring/heap.ml index 1f4fd6fc..c4296f6b 100644 --- a/lib/uring/heap.ml +++ b/lib/uring/heap.ml @@ -21,10 +21,12 @@ type ptr = int let slot_taken = -1 let free_list_nil = -2 +type result_kind = Kind_FD | Kind_Int + (* [extra_data] is for keeping pointers passed to C alive. *) type 'a entry = | Empty : 'a entry - | Entry : { data : 'a; extra_data : 'b; mutable ptr : int } -> 'a entry + | Entry : { data : 'a; kind: result_kind; extra_data : 'b; mutable ptr : int } -> 'a entry (* Free-list allocator *) type 'a t = @@ -113,10 +115,10 @@ let grow t = t.free_head <- new_free_head; t.data <- new_data -let alloc t data ~extra_data = +let alloc t kind data ~extra_data = if t.free_head = free_list_nil then grow t; let ptr = t.free_head in - let entry = Entry { data; extra_data; ptr } in + let entry = Entry { data; kind; extra_data; ptr } in t.data.(ptr) <- entry; (* Drop [ptr] from the free list. *) let tail = t.free_tail_relation.(ptr) in @@ -137,7 +139,7 @@ let free t ptr = | Empty -> assert false | Entry p -> p.ptr <- -1; - p.data + p.data, p.kind in (* Cons [ptr] to the free-list. *) diff --git a/lib/uring/heap.mli b/lib/uring/heap.mli index 93f77da8..c0b9f0c3 100644 --- a/lib/uring/heap.mli +++ b/lib/uring/heap.mli @@ -17,6 +17,8 @@ type 'a t (** A bounded heap of values of type ['a]. *) +type result_kind = Kind_FD | Kind_Int + val create : int -> _ t (** [create n] is a heap that holds at most [n] elements. *) @@ -30,13 +32,13 @@ val ptr : 'a entry -> ptr (** [ptr e] is the index of [e]. @raise Invalid_argument if [e] has already been freed. *) -val alloc : 'a t -> 'a -> extra_data:'b -> 'a entry +val alloc : 'a t -> result_kind -> 'a -> extra_data:'b -> 'a entry (** [alloc t a ~extra_data] adds the value [a] to [t] and returns a pointer to that value, or raises [Invalid_argument] if no extra space can be created for [t], or [t] has already been [release]d. @param extra_data Prevent this from being GC'd until [free] is called. *) -val free : 'a t -> ptr -> 'a +val free : 'a t -> ptr -> 'a * result_kind (** [free t p] returns the element referenced by [p] and removes it from the heap. Has undefined behaviour if [p] has already been freed. *) diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index ae5124dc..5864e437 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -301,6 +301,15 @@ type clock = Boottime | Realtime type probe +type _ return_kind = + | FD : Unix.file_descr return_kind + | Error : Unix.error return_kind + | Int : int return_kind + +type 'a completion_option = + | None : 'a completion_option + | Some : { result: 'b; kind: 'b return_kind; data: 'a } -> 'a completion_option + module Uring = struct type t @@ -328,17 +337,17 @@ module Uring = struct external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc] - external submit_statx : t -> id -> Unix.file_descr -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] + external submit_statx : t -> id -> Unix.file_descr option -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc] external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_bind" [@@noalloc] external submit_listen : t -> id -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_listen" [@@noalloc] external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_connect" [@@noalloc] external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_accept" [@@noalloc] external submit_cancel : t -> id -> id -> bool = "ocaml_uring_submit_cancel" [@@noalloc] - external submit_openat2 : t -> id -> Unix.file_descr -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc] - external submit_linkat : t -> id -> Unix.file_descr -> Sketch.ptr -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] - external submit_unlinkat : t -> id -> Unix.file_descr -> Sketch.ptr -> bool -> bool = "ocaml_uring_submit_unlinkat" [@@noalloc] - external submit_mkdirat : t -> id -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_mkdirat" [@@noalloc] + external submit_openat2 : t -> id -> Unix.file_descr option -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc] + external submit_linkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> Unix.file_descr option -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] + external submit_unlinkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> bool -> bool = "ocaml_uring_submit_unlinkat" [@@noalloc] + external submit_mkdirat : t -> id -> Unix.file_descr option -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_mkdirat" [@@noalloc] external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_send_msg" [@@noalloc] external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_recv_msg" [@@noalloc] external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fsync" [@@noalloc] @@ -352,6 +361,7 @@ module Uring = struct external wait_cqe : t -> cqe_option = "ocaml_uring_wait_cqe" external wait_cqe_timeout : float -> t -> cqe_option = "ocaml_uring_wait_cqe_timeout" external peek_cqe : t -> cqe_option = "ocaml_uring_peek_cqe" + external completion_of_result : 'a -> int -> 'a completion_option = "ocaml_uring_completion_of_result" external error_of_errno : int -> Unix.error = "ocaml_uring_error_of_errno" external register_eventfd : t -> Unix.file_descr -> unit = "ocaml_uring_register_eventfd" @@ -439,9 +449,9 @@ let exit t = Uring.exit t.uring; unregister_gc_root t -let with_id_full : type a. a t -> (Heap.ptr -> bool) -> a -> extra_data:'b -> a job option = - fun t fn datum ~extra_data -> - match Heap.alloc t.data datum ~extra_data with +let with_id_full : type a. a t -> (Heap.ptr -> bool) -> Heap.result_kind -> a -> extra_data:'b -> a job option = + fun t fn result_kind datum ~extra_data -> + match Heap.alloc t.data result_kind datum ~extra_data with | exception (Invalid_argument _ as ex) -> check t; raise ex | entry -> let ptr = Heap.ptr entry in @@ -449,32 +459,31 @@ let with_id_full : type a. a t -> (Heap.ptr -> bool) -> a -> extra_data:'b -> a if has_space then Some entry else ( - ignore (Heap.free t.data ptr : a); + ignore (Heap.free t.data ptr : a * Heap.result_kind); None ) -let with_id t fn a = with_id_full t fn a ~extra_data:() +let with_id t fn result_kind a = with_id_full t fn result_kind a ~extra_data:() let noop t user_data = - with_id t (fun id -> Uring.submit_nop t.uring id) user_data + with_id t (fun id -> Uring.submit_nop t.uring id) Heap.Kind_Int user_data external set_timespec: Sketch.ptr -> int64 -> unit = "ocaml_uring_set_timespec" [@@noalloc] let timeout ?(absolute = false) t clock timeout_ns user_data = let timespec_ptr = Sketch.alloc t.sketch Config.sizeof_kernel_timespec in set_timespec timespec_ptr timeout_ns; - with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) user_data +(* XXX Kind_Error? *) + with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) Heap.Kind_Int user_data -let at_fdcwd : Unix.file_descr = Obj.magic Config.at_fdcwd - -let openat2 t ~access ~flags ~perm ~resolve ?(fd=at_fdcwd) path user_data = +let openat2 t ~access ~flags ~perm ~resolve ?fd path user_data = let open_flags = flags lor match access with | `R -> Open_flags.rdonly | `W -> Open_flags.wronly | `RW -> Open_flags.rdwr in let open_how = Open_how.v ~open_flags ~perm ~resolve path in - with_id_full t (fun id -> Uring.submit_openat2 t.uring id fd open_how) user_data ~extra_data:open_how + with_id_full t (fun id -> Uring.submit_openat2 t.uring id fd open_how) Kind_FD user_data ~extra_data:open_how module Linkat_flags = struct include Flags @@ -482,85 +491,88 @@ module Linkat_flags = struct let symlink_follow = Config.At.symlink_follow end -let linkat t ?(old_dir_fd=at_fdcwd) ?(new_dir_fd=at_fdcwd) ~flags ~old_path ~new_path user_data = +let linkat t ?old_dir_fd ?new_dir_fd ~flags ~old_path ~new_path user_data = with_id t (fun id -> let old_path_buf = Sketch.String.alloc t.sketch old_path in let new_path_buf = Sketch.String.alloc t.sketch new_path in Uring.submit_linkat t.uring id old_dir_fd old_path_buf new_dir_fd new_path_buf flags - ) user_data + (* XXX Kind_Error? *) + ) Heap.Kind_Int user_data -let unlink t ~dir ?(fd=at_fdcwd) path user_data = +let unlink t ~dir ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_unlinkat t.uring id fd buf dir - ) user_data + (* XXX Kind_Error? *) + ) Heap.Kind_Int user_data -let mkdirat t ~mode ?(fd=at_fdcwd) path user_data = +let mkdirat t ~mode ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_mkdirat t.uring id fd buf mode - ) user_data + (* XXX Kind_Error? *) + ) Heap.Kind_Int user_data let read t ~file_offset fd (buf : Cstruct.t) user_data = - with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf + with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) Heap.Kind_Int user_data ~extra_data:buf let write t ~file_offset fd (buf : Cstruct.t) user_data = - with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf + with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) Heap.Kind_Int user_data ~extra_data:buf let iov_max = Config.iov_max let readv t ~file_offset fd buffers user_data = with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_readv t.uring fd id iovec file_offset) user_data ~extra_data:buffers + Uring.submit_readv t.uring fd id iovec file_offset) Heap.Kind_Int user_data ~extra_data:buffers let read_fixed t ~file_offset fd ~off ~len user_data = - with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data + with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data let read_chunk ?len t ~file_offset fd chunk user_data = let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; - with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data + with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data let write_fixed t ~file_offset fd ~off ~len user_data = - with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data + with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data let write_chunk ?len t ~file_offset fd chunk user_data = let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; - with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data + with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data let writev t ~file_offset fd buffers user_data = with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_writev t.uring fd id iovec file_offset) user_data ~extra_data:buffers + Uring.submit_writev t.uring fd id iovec file_offset) Heap.Kind_Int user_data ~extra_data:buffers let poll_add t fd poll_mask user_data = - with_id t (fun id -> Uring.submit_poll_add t.uring fd id poll_mask) user_data + with_id t (fun id -> Uring.submit_poll_add t.uring fd id poll_mask) Heap.Kind_Int user_data let close t fd user_data = - with_id t (fun id -> Uring.submit_close t.uring fd id) user_data + with_id t (fun id -> Uring.submit_close t.uring fd id) Heap.Kind_Int user_data -let statx t ?(fd=at_fdcwd) ~mask path statx flags user_data = +let statx t ?fd ~mask path statx flags user_data = let spath = Sketch.String.alloc t.sketch path in - with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) user_data ~extra_data:statx + with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) Heap.Kind_Int user_data ~extra_data:statx let splice t ~src ~dst ~len user_data = - with_id t (fun id -> Uring.submit_splice t.uring id src dst len) user_data + with_id t (fun id -> Uring.submit_splice t.uring id src dst len) Heap.Kind_Int user_data let bind t fd addr user_data = let addr = Sockaddr.of_unix addr in - with_id_full t (fun id -> Uring.submit_bind t.uring id fd addr) user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_bind t.uring id fd addr) Heap.Kind_Int user_data ~extra_data:addr let listen t fd backlog user_data = - with_id t (fun id -> Uring.submit_listen t.uring id fd backlog) user_data + with_id t (fun id -> Uring.submit_listen t.uring id fd backlog) Heap.Kind_Int user_data let connect t fd addr user_data = let addr = Sockaddr.of_unix addr in - with_id_full t (fun id -> Uring.submit_connect t.uring id fd addr) user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_connect t.uring id fd addr) Heap.Kind_Int user_data ~extra_data:addr let accept t fd addr user_data = - with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) Heap.Kind_FD user_data ~extra_data:addr let send_msg ?(fds=[]) ?dst t fd buffers user_data = let addr = Option.map Sockaddr.of_unix dst in @@ -569,24 +581,24 @@ let send_msg ?(fds=[]) ?dst t fd buffers user_data = (* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *) with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_send_msg t.uring id fd msghdr iovec) user_data ~extra_data:msghdr + Uring.submit_send_msg t.uring id fd msghdr iovec) Heap.Kind_Int user_data ~extra_data:msghdr let recv_msg t fd msghdr user_data = let _, _, buffers = msghdr in (* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *) with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_recv_msg t.uring id fd msghdr iovec) user_data ~extra_data:msghdr + Uring.submit_recv_msg t.uring id fd msghdr iovec) Heap.Kind_Int user_data ~extra_data:msghdr let fsync t ?(off=0L) ?(len=0) fd user_data = - with_id t (fun id -> Uring.submit_fsync t.uring id fd off len) user_data + with_id t (fun id -> Uring.submit_fsync t.uring id fd off len) Heap.Kind_Int user_data let fdatasync t ?(off=0L) ?(len=0) fd user_data = - with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) user_data + with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) Heap.Kind_Int user_data let cancel t job user_data = ignore (Heap.ptr job : Uring.id); (* Check it's still valid *) - with_id t (fun id -> Uring.submit_cancel t.uring id (Heap.ptr job)) user_data + with_id t (fun id -> Uring.submit_cancel t.uring id (Heap.ptr job)) Heap.Kind_Int user_data let sqe_ready t = Uring.sq_ready t.uring @@ -611,16 +623,16 @@ let submit t = gc_sketch t; v -type 'a completion_option = - | None - | Some of { result: int; data: 'a } - let fn_on_ring fn t = match fn t.uring with | Uring.Cqe_none -> None | Uring.Cqe_some { user_data_id; res } -> - let data = Heap.free t.data user_data_id in - Some { result = res; data } + let (data, kind) = Heap.free t.data user_data_id in + match kind with + | Heap.Kind_FD -> + Uring.completion_of_result data res + | Heap.Kind_Int -> + Some { result = res; kind = Int; data } let get_cqe_nonblocking t = check t; diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 55d431bc..70b792e3 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -840,9 +840,14 @@ val submit : 'a t -> int to the kernel. Their results can subsequently be retrieved using {!wait} or {!peek}. *) +type _ return_kind = + | FD : Unix.file_descr return_kind + | Error : Unix.error return_kind + | Int : int return_kind + type 'a completion_option = - | None - | Some of { result: int; data: 'a } (**) + | None : 'a completion_option + | Some : { result: 'b; kind: 'b return_kind; data: 'a } -> 'a completion_option (**) (** The type of results of calling {!wait} and {!peek}. [None] denotes that either there were no completions in the queue or an interrupt / timeout occurred. [Some] contains both the user data attached to the completed diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index 3ae351aa..d655c3e0 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -242,6 +242,14 @@ ocaml_uring_make_open_how(value v_flags, value v_mode, value v_resolve, value v_ CAMLreturn(v); } +static int with_at_fdcwd(value v_fd) +{ + if (Is_none(v_fd)) + return AT_FDCWD; + else + return Int_val(Some_val(v_fd)); +} + // Caller must ensure v_open_how is not GC'd until the job is finished. value /* noalloc */ ocaml_uring_submit_openat2(value v_uring, value v_id, value v_fd, value v_open_how) { @@ -249,7 +257,7 @@ ocaml_uring_submit_openat2(value v_uring, value v_id, value v_fd, value v_open_h struct io_uring_sqe *sqe = io_uring_get_sqe(ring); if (!sqe) return (Val_false); struct open_how_data *data = Open_how_val(v_open_how); - io_uring_prep_openat2(sqe, Int_val(v_fd), data->path, &data->how); + io_uring_prep_openat2(sqe, with_at_fdcwd(v_fd), data->path, &data->how); io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); return (Val_true); } @@ -483,7 +491,7 @@ ocaml_uring_submit_statx_native(value v_uring, value v_id, value v_fd, value v_s struct io_uring_sqe *sqe = io_uring_get_sqe(ring); if (!sqe) return (Val_false); char *path = Sketch_ptr_val(v_sketch_ptr); - io_uring_prep_statx(sqe, Int_val(v_fd), path, Int_val(v_flags), Int_val(v_mask), Statx_val(v_statx)); + io_uring_prep_statx(sqe, with_at_fdcwd(v_fd), path, Int_val(v_flags), Int_val(v_mask), Statx_val(v_statx)); io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); return (Val_true); } @@ -873,7 +881,7 @@ ocaml_uring_submit_unlinkat(value v_uring, value v_id, value v_fd, value v_sketc int flags = Bool_val(v_rmdir) ? AT_REMOVEDIR : 0; char *path = Sketch_ptr_val(v_sketch_ptr); if (!sqe) return (Val_false); - io_uring_prep_unlinkat(sqe, Int_val(v_fd), path, flags); + io_uring_prep_unlinkat(sqe, with_at_fdcwd(v_fd), path, flags); io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); return (Val_true); } @@ -884,7 +892,7 @@ ocaml_uring_submit_mkdirat(value v_uring, value v_id, value v_fd, value v_sketch struct io_uring_sqe *sqe = io_uring_get_sqe(ring); char *path = Sketch_ptr_val(v_sketch_ptr); if (!sqe) return (Val_false); - io_uring_prep_mkdirat(sqe, Int_val(v_fd), path, Int_val(v_mode)); + io_uring_prep_mkdirat(sqe, with_at_fdcwd(v_fd), path, Int_val(v_mode)); io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); return (Val_true); } @@ -1031,6 +1039,32 @@ value ocaml_uring_error_of_errno(value v_errno) { return unix_error_of_code(Int_val(v_errno)); } +#ifndef CAML_UNIX_FILE_DESCR_API +#ifdef _WIN32 +#error "Unix-specific treatment of Unix.file_descr" +#else +#define caml_unix_file_descr_of_fd(fd) Val_int(fd) +#endif /* #ifdef _WIN32 */ +#endif /* #ifndef CAML_UNIX_FILE_DESCR_API */ + +value ocaml_uring_completion_of_result(value v_data, value v_result) +{ + CAMLparam0(); + CAMLlocal2(result, val); + if (Int_val(v_result) < 0) { + val = unix_error_of_code(-Int_val(v_result)); + result = caml_alloc_small(3, 0); + Field(result, 1) = Val_int(1); + } else { + val = caml_unix_file_descr_of_fd(Int_val(v_result)); + result = caml_alloc_small(3, 0); + Field(result, 1) = Val_int(0); + } + Field(result, 0) = val; + Field(result, 2) = v_data; + CAMLreturn(result); +} + #define Probe_val(v) (*((struct io_uring_probe **) Data_custom_val(v))) static void finalize_probe(value v) { @@ -1097,7 +1131,7 @@ ocaml_uring_submit_linkat_native(value v_uring, value v_id, if (!sqe) return Val_false; - io_uring_prep_linkat(sqe, Int_val(v_old_dir), old_path, Int_val(v_new_dir), new_path, Int_val(v_flags)); + io_uring_prep_linkat(sqe, with_at_fdcwd(v_old_dir), old_path, with_at_fdcwd(v_new_dir), new_path, Int_val(v_flags)); io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); return Val_true; diff --git a/tests/heap.md b/tests/heap.md index 6cf579ab..01da2885 100644 --- a/tests/heap.md +++ b/tests/heap.md @@ -35,13 +35,13 @@ Test normal usage: match attempt_alloc with | true -> let data = Random.int 5000 in - let ptr = Heap.ptr (Heap.alloc t data) in + let ptr = Heap.ptr (Heap.alloc t Heap.Kind_Int data) in assert (not (Hashtbl.mem reference ptr)); Hashtbl.add reference ptr data; incr currently_allocated | false -> let (k, v) = random_hashtbl_elt reference in - let v' = Heap.free t k in + let v', _ = Heap.free t k in Hashtbl.remove reference k; assert (v = v'); decr currently_allocated @@ -62,7 +62,7 @@ let shuffle_list l = # let t = Heap.create 0 in let add_l = List.init 1024 (fun i -> i) |> shuffle_list in assert (Heap.in_use t = 0); - let free_l = List.map (fun i -> Heap.alloc t i |> Heap.ptr) add_l |> + let free_l = List.map (fun i -> Heap.alloc t Heap.Kind_Int i |> Heap.ptr) add_l |> shuffle_list in assert (Heap.in_use t = 1024); @@ -77,10 +77,10 @@ Double free in an empty heap: ```ocaml # let t : int Heap.t = Heap.create 1;; val t : int Heap.t = -# let p = Heap.ptr @@ Heap.alloc t 1;; +# let p = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 1;; val p : Heap.ptr = 0 # Heap.free t p;; -- : int = 1 +- : int * Heap.result_kind = (1, Heap.Kind_Int) # Heap.free t p;; Exception: Invalid_argument "Heap.free: pointer already freed". # let t : unit = Heap.release t;; @@ -92,16 +92,16 @@ Double free in a non-empty heap: ```ocaml # let t : int Heap.t = Heap.create 2;;; val t : int Heap.t = -# let p1 = Heap.ptr @@ Heap.alloc t 1;;; +# let p1 = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 1;;; val p1 : Heap.ptr = 0 -# let p2 = Heap.ptr @@ Heap.alloc t 2;;; +# let p2 = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 2;;; val p2 : Heap.ptr = 1 # Heap.free t p1;; -- : int = 1 +- : int * Heap.result_kind = (1, Heap.Kind_Int) # Heap.free t p1;; Exception: Invalid_argument "Heap.free: pointer already freed". # Heap.free t p2;; -- : int = 2 +- : int * Heap.result_kind = (2, Heap.Kind_Int) # let t : unit = Heap.release t;; val t : unit = () ``` diff --git a/tests/main.md b/tests/main.md index 51da6ac5..9ac797a4 100644 --- a/tests/main.md +++ b/tests/main.md @@ -16,11 +16,29 @@ module Test_data = struct close_out oc end +let rec consume_exn name path t = + match Uring.wait ~timeout:1. t with + | None -> consume_exn name path t + | Some { data; kind = Uring.Error; result } -> + raise (Unix.Unix_error ((result : Unix.error), name, path)) + | Some { data; kind = Uring.FD; result } -> data, (result : Unix.file_descr) + | Some _ -> assert false + let rec consume t = match Uring.wait ~timeout:1. t with - | Some { data; result } -> (data, result) + | Some { data; kind = Uring.Int; result } -> (data, (result : int)) + | Some _ -> + assert false | None -> consume t +let rec consume_fd t = + match Uring.wait ~timeout:1. t with + | None -> consume_fd t + | Some { data; kind = Uring.FD; result } -> + data, (result : Unix.file_descr) + | Some _ -> + assert false + let traceln fmt = Format.printf (fmt ^^ "@.") ``` @@ -134,10 +152,7 @@ val t : [ `Open ] Uring.t = # Uring.submit t;;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd t;; val token : [ `Open ] = `Open val fd : Unix.file_descr = @@ -170,10 +185,7 @@ val t : [ `Create ] Uring.t = # Uring.submit t;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd t;; val token : [ `Create ] = `Create val fd : Unix.file_descr = @@ -252,10 +264,7 @@ Now using `~fd`: # Uring.submit t;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd t;; val token : [ `Open_path | `Statx ] = `Open_path val fd : Unix.file_descr = @@ -301,14 +310,9 @@ val t : [ `Get_path ] Uring.t = path `Get_path)); traceln "Submitted %d" (Uring.submit t); - let `Get_path, fd = consume t in - if fd >= 0 then ( - let fd : Unix.file_descr = Obj.magic fd in - Unix.close fd; - traceln "Opened %S OK" path - ) else ( - raise (Unix.Unix_error (Uring.error_of_errno fd, "openat2", path)) - );; + let `Get_path, fd = consume_exn "openat2" path t in + Unix.close fd; + traceln "Opened %S OK" path;; val get : resolve:Uring.Resolve.t -> string -> unit = # get ~resolve:Uring.Resolve.empty ".";; @@ -814,10 +818,12 @@ val check : unit -> bool * bool = - : unit Uring.job option = Some # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = +Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = +Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} # check ();; - : bool * bool = (false, false) @@ -840,7 +846,8 @@ val t : unit Uring.t = # Uring.submit t;; - : int = 1 # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = +Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} # (Unix.lstat "new-symlink").st_kind;; - : Unix.file_kind = Unix.S_LNK ``` @@ -876,7 +883,7 @@ val t : [ `Mkdir of int ] Uring.t = - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = 0; data = `Mkdir 0} +Uring.Some {Uring.result = ; kind = Uring.Int; data = `Mkdir 0} # Printf.sprintf "0o%o" ((Unix.stat "mkdir").st_perm land 0o777);; - : string = "0o700" # let v = Uring.mkdirat t ~mode:0o755 "mkdir" (`Mkdir 1);; @@ -885,7 +892,7 @@ val v : [ `Mkdir of int ] Uring.job option = Some - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = -17; data = `Mkdir 1} +Uring.Some {Uring.result = ; kind = Uring.Int; data = `Mkdir 1} # Uring.exit t;; - : unit = () ``` diff --git a/tests/poll_add.ml b/tests/poll_add.ml index 5bceb1c3..030b897c 100644 --- a/tests/poll_add.ml +++ b/tests/poll_add.ml @@ -13,7 +13,8 @@ let () = let rec retry () = match Uring.wait t with | None -> retry () - | Some { result; _ } -> result + | Some { result; kind = Uring.Int; _ } -> (result : int) + | Some _ -> assert false in let res = retry () in Printf.eprintf "poll_add: %x\n%!" res; diff --git a/tests/sketch.md b/tests/sketch.md index 8fcc5a9f..01735337 100644 --- a/tests/sketch.md +++ b/tests/sketch.md @@ -11,7 +11,8 @@ let ldup n x = List.init n (Fun.const x) let rec consume t = match Uring.wait t with - | Some { data; result } -> (data, result) + | Some { data; kind = Uring.Int; result } -> (data, (result : int)) + | Some _ -> assert false | None -> consume t ``` diff --git a/tests/socket_ops.ml b/tests/socket_ops.ml index 36b5c2be..994233e5 100644 --- a/tests/socket_ops.ml +++ b/tests/socket_ops.ml @@ -20,7 +20,8 @@ let () = let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for bind" - | Uring.Some { result; data = _ } -> + | Uring.Some { result; kind = Uring.Int; data = _ } -> + let result = (result : int) in if result < 0 then begin Uring.close t server_sock () |> ignore; Uring.submit t |> ignore; @@ -29,6 +30,7 @@ let () = failwith (sprintf "Bind failed: %s" (Unix.error_message err)) end else result + | Uring.Some _ -> assert false in printf "Bind completed with result: %d\n" bind_result; @@ -41,7 +43,8 @@ let () = let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for listen" - | Uring.Some { result; data = _ } -> + | Uring.Some { result; kind = Uring.Int; data = _ } -> + let result = (result : int) in if result < 0 then begin Uring.close t server_sock () |> ignore; Uring.submit t |> ignore; @@ -50,6 +53,7 @@ let () = failwith (sprintf "Listen failed: %s" (Unix.error_message err)) end else result + | Uring.Some _ -> assert false in printf "Listen completed with result: %d\n" listen_result; @@ -78,7 +82,8 @@ let () = let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for connect" - | Uring.Some { result; data = _ } -> + | Uring.Some { result; kind = Uring.Int; data = _ } -> + let result = (result : int) in (* Connect may return -EINPROGRESS for non-blocking sockets, which is normal *) if result < 0 && result <> (-115) (* -EINPROGRESS *) then begin Uring.close t client_sock () |> ignore; @@ -89,6 +94,7 @@ let () = failwith (sprintf "Connect failed: %s (errno: %d)" (Unix.error_message err) (-result)) end else result + | Uring.Some _ -> assert false in if connect_result = 0 || connect_result = (-115) then @@ -122,12 +128,13 @@ let () = if pending > 0 then match Uring.wait t with | Uring.None -> failwith "No completion for close" - | Uring.Some { result; data = _ } -> + | Uring.Some { result; kind = Uring.Int; data = _ } -> if result < 0 then printf "Close warning: %s\n" (Unix.error_message (Uring.error_of_errno (-result))); wait_closes (pending - 1) + | Uring.Some _ -> assert false in wait_closes 2; Uring.exit t; - printf "Test completed successfully!\n" \ No newline at end of file + printf "Test completed successfully!\n" diff --git a/tests/urcat.ml b/tests/urcat.ml index b114f5ca..58291833 100644 --- a/tests/urcat.ml +++ b/tests/urcat.ml @@ -11,7 +11,8 @@ let get_file_size fd = let get_completion_and_print uring = let iov, len = match Uring.wait uring with - | Some { data; result } -> (data, result) + | Some { data; kind = Uring.Int; result } -> (data, (result : int)) + | Some _ -> assert false | None -> failwith "retry" in let remaining = ref len in diff --git a/tests/urcp_fixed_lib.ml b/tests/urcp_fixed_lib.ml index 83d93c6f..f1cd4fba 100644 --- a/tests/urcp_fixed_lib.ml +++ b/tests/urcp_fixed_lib.ml @@ -137,11 +137,13 @@ let copy_file uring t = if t.write_left > 0 then begin let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with - |None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - |Some { data; result } -> - handle_completion uring data result; - got_completion := true; - handle_completions (); + | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") + | Some { data; kind = Int; result } -> + handle_completion uring data result; + got_completion := true; + handle_completions (); + | Some _ -> + assert false end in handle_completions (); diff --git a/tests/urcp_lib.ml b/tests/urcp_lib.ml index b1d194fe..2d76a309 100644 --- a/tests/urcp_lib.ml +++ b/tests/urcp_lib.ml @@ -142,11 +142,13 @@ let copy_file uring t = if t.write_left > 0 then begin let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with - |None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - |Some { data; result } -> - handle_completion uring data result; - got_completion := true; - handle_completions (); + | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") + | Some { data; kind = Int; result } -> + handle_completion uring data result; + got_completion := true; + handle_completions () + | Some _ -> + assert false end in handle_completions (); diff --git a/tests/urstat.ml b/tests/urstat.ml index 702c29ec..8e269516 100644 --- a/tests/urstat.ml +++ b/tests/urstat.ml @@ -11,9 +11,9 @@ let pp_time f (sec, nsec) = nsec let get_completion_and_print uring = - let (fname, buf), _ = + let fname, buf = match Uring.wait uring with - | Some { data; result } -> (data, result) + | Some { data; _ } -> data | None -> failwith "retry" in let kind = S.kind buf in From 26b0ea8fcd506a88f9b255aa712aef49349d6f44 Mon Sep 17 00:00:00 2001 From: David Allsopp Date: Wed, 1 Oct 2025 17:46:31 +0100 Subject: [PATCH 2/2] Somewhat more satisfactory approach Steal two bits in the sqe data to allow the C stub to return the appropriate encoding directly. --- README.md | 33 +++--- bench/readv.ml | 27 ++--- lib/uring/heap.ml | 10 +- lib/uring/heap.mli | 6 +- lib/uring/uring.ml | 189 +++++++++++++++--------------- lib/uring/uring.mli | 16 ++- lib/uring/uring_stubs.c | 251 +++++++++++++++++++++------------------- tests/heap.md | 18 +-- tests/main.md | 99 ++++++++-------- tests/poll_add.ml | 4 +- tests/sketch.md | 4 +- tests/socket_ops.ml | 86 +++++++------- tests/urcat.ml | 4 +- tests/urcp_fixed_lib.ml | 45 ++++--- tests/urcp_lib.ml | 44 +++---- tests/urstat.ml | 5 +- 16 files changed, 422 insertions(+), 419 deletions(-) diff --git a/README.md b/README.md index 4f7250d4..03aba97b 100644 --- a/README.md +++ b/README.md @@ -59,16 +59,16 @@ We can now ask Linux to suspend the process until a result (Completion Queue Ent let rec wait_with_retry uring = match Uring.wait uring with | None -> wait_with_retry uring (* Interrupted *) - | Some { result; kind = Uring.Int; data } -> None, (result : int), data - | Some { result; kind = Uring.FD; data } -> Some (result : Unix.file_descr), 0, data - | Some { result; kind = Uring.Error; data } -> failwith (Unix.error_message result) + | Unit { result = (); data } -> Either.Left 0, data + | Int { result; data } -> Either.Left result, data + | FD { result; data } -> Either.Right result, data + | Error { result; data } -> failwith ("Unexpected error: " ^ Unix.error_message result);; ``` ```ocaml -# let fd, result, data = wait_with_retry uring;; -val fd : Unix.file_descr option = Some -val result : int = 0 +# let result, data = wait_with_retry uring;; +val result : (int, Unix.file_descr) Either.t = Right val data : _[> `Open_log ] = `Open_log ``` @@ -78,10 +78,14 @@ The `data` field is the data we passed in when submitting the request, allowing The `result` field is the return code, with the same meaning as the return code from the corresponding system call (`openat2` in this case). +```ocaml +# let fd = Either.fold ~left:(fun _ -> assert false) ~right:Fun.id result +val fd : Unix.file_descr = +``` + We can now submit further requests. e.g. ```ocaml -let fd = Option.get fd let rec write_all fd = function | [] -> () | bufs -> @@ -94,11 +98,12 @@ let rec write_all fd = function |> Option.get (* We know we have enough space here *) in assert (Uring.submit uring = 1); - let _, result, data = wait_with_retry uring in - assert (data = `Write_all); (* There aren't any other requests pending *) - assert (result > 0); (* Check for error return *) - let bufs = Cstruct.shiftv bufs result in - write_all fd bufs + match wait_with_retry uring with + | Either.Left result, `Write_all -> + let bufs = Cstruct.shiftv bufs result in + write_all fd bufs + | _ -> + assert false (* There aren't any other requests pending *) ``` ```ocaml @@ -121,9 +126,9 @@ Some - : int = 1 # wait_with_retry uring;; -- : Unix.file_descr option * int * +- : (int, Unix.file_descr) Either.t * ([> `Close_log | `Open_log | `Write_all ] as '_weak3) -= (None, 0, `Close_log) += (Either.Left 0, `Close_log) ``` The file has now been written: diff --git a/bench/readv.ml b/bench/readv.ml index 0529b4df..7a493ddc 100644 --- a/bench/readv.ml +++ b/bench/readv.ml @@ -5,15 +5,14 @@ let buffer_size = 100 (* Use a small buffer to stress the system more *) let n_concurrent = 16 (* How many requests to have active at once *) let n_iters = 1_000_000 (* How many times to accept and resubmit *) -let rec wait t handle = - match Uring.get_cqe_nonblocking t with - | Some { result; kind = Uring.Int; data = buf } -> handle (result : int) buf - | Some _ -> assert false - | None -> - match Uring.wait t with - | None -> wait t handle - | Some { result; kind = Uring.Int; data = buf } -> handle (result : int) buf - | Some _ -> assert false +let rec process wait t handle = function + | Uring.Int { result; data = buf } -> handle result buf + | Error { result; data = _ } -> + raise (Unix.Unix_error (result, "readv", "")) + | FD _ | Unit _ -> failwith "Unexpected return from readv" + | None -> process wait t handle (wait t) + +let wait t handle = process Uring.wait t handle (Uring.get_cqe_nonblocking t) let run_bechmark ~polling_timeout fd = let got = ref 0 in @@ -31,13 +30,9 @@ let run_bechmark ~polling_timeout fd = let t0 = Unix.gettimeofday () in for _ = 1 to n_iters do wait t (fun result bufs -> - if result < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno result, "readv", "")) - ) else ( - got := !got + result; - let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in - () - ) + got := !got + result; + let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in + () ) done; (* Get a snapshot of the stats before letting things finish. *) diff --git a/lib/uring/heap.ml b/lib/uring/heap.ml index c4296f6b..1f4fd6fc 100644 --- a/lib/uring/heap.ml +++ b/lib/uring/heap.ml @@ -21,12 +21,10 @@ type ptr = int let slot_taken = -1 let free_list_nil = -2 -type result_kind = Kind_FD | Kind_Int - (* [extra_data] is for keeping pointers passed to C alive. *) type 'a entry = | Empty : 'a entry - | Entry : { data : 'a; kind: result_kind; extra_data : 'b; mutable ptr : int } -> 'a entry + | Entry : { data : 'a; extra_data : 'b; mutable ptr : int } -> 'a entry (* Free-list allocator *) type 'a t = @@ -115,10 +113,10 @@ let grow t = t.free_head <- new_free_head; t.data <- new_data -let alloc t kind data ~extra_data = +let alloc t data ~extra_data = if t.free_head = free_list_nil then grow t; let ptr = t.free_head in - let entry = Entry { data; kind; extra_data; ptr } in + let entry = Entry { data; extra_data; ptr } in t.data.(ptr) <- entry; (* Drop [ptr] from the free list. *) let tail = t.free_tail_relation.(ptr) in @@ -139,7 +137,7 @@ let free t ptr = | Empty -> assert false | Entry p -> p.ptr <- -1; - p.data, p.kind + p.data in (* Cons [ptr] to the free-list. *) diff --git a/lib/uring/heap.mli b/lib/uring/heap.mli index c0b9f0c3..93f77da8 100644 --- a/lib/uring/heap.mli +++ b/lib/uring/heap.mli @@ -17,8 +17,6 @@ type 'a t (** A bounded heap of values of type ['a]. *) -type result_kind = Kind_FD | Kind_Int - val create : int -> _ t (** [create n] is a heap that holds at most [n] elements. *) @@ -32,13 +30,13 @@ val ptr : 'a entry -> ptr (** [ptr e] is the index of [e]. @raise Invalid_argument if [e] has already been freed. *) -val alloc : 'a t -> result_kind -> 'a -> extra_data:'b -> 'a entry +val alloc : 'a t -> 'a -> extra_data:'b -> 'a entry (** [alloc t a ~extra_data] adds the value [a] to [t] and returns a pointer to that value, or raises [Invalid_argument] if no extra space can be created for [t], or [t] has already been [release]d. @param extra_data Prevent this from being GC'd until [free] is called. *) -val free : 'a t -> ptr -> 'a * result_kind +val free : 'a t -> ptr -> 'a (** [free t p] returns the element referenced by [p] and removes it from the heap. Has undefined behaviour if [p] has already been freed. *) diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index 5864e437..761d58a9 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -295,21 +295,10 @@ module Msghdr = struct create_with_addr ~n_fds ~fds:[] ?addr buffs end -type 'a job = 'a Heap.entry - type clock = Boottime | Realtime type probe -type _ return_kind = - | FD : Unix.file_descr return_kind - | Error : Unix.error return_kind - | Int : int return_kind - -type 'a completion_option = - | None : 'a completion_option - | Some : { result: 'b; kind: 'b return_kind; data: 'a } -> 'a completion_option - module Uring = struct type t @@ -325,48 +314,52 @@ module Uring = struct external opcode_supported : probe -> Op.t -> bool = "ocaml_uring_opcode_supported" [@@noalloc] type id = Heap.ptr + type job = private int type offset = Optint.Int63.t - external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc] - external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc] - external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc] - external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] - external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] - external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc] - external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc] - external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] - external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] - external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc] - external submit_statx : t -> id -> Unix.file_descr option -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] - external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc] - external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_bind" [@@noalloc] - external submit_listen : t -> id -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_listen" [@@noalloc] - external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_connect" [@@noalloc] - external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_accept" [@@noalloc] - external submit_cancel : t -> id -> id -> bool = "ocaml_uring_submit_cancel" [@@noalloc] - external submit_openat2 : t -> id -> Unix.file_descr option -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc] - external submit_linkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> Unix.file_descr option -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] - external submit_unlinkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> bool -> bool = "ocaml_uring_submit_unlinkat" [@@noalloc] - external submit_mkdirat : t -> id -> Unix.file_descr option -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_mkdirat" [@@noalloc] - external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_send_msg" [@@noalloc] - external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_recv_msg" [@@noalloc] - external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fsync" [@@noalloc] - external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fdatasync" [@@noalloc] + external submit_nop : t -> id -> job = "ocaml_uring_submit_nop" [@@noalloc] + external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> job = "ocaml_uring_submit_timeout" [@@noalloc] + external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> job = "ocaml_uring_submit_poll_add" [@@noalloc] + external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_read" [@@noalloc] + external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_write" [@@noalloc] + external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_readv" [@@noalloc] + external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_writev" [@@noalloc] + external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] + external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] + external submit_close : t -> Unix.file_descr -> id -> job = "ocaml_uring_submit_close" [@@noalloc] + external submit_statx : t -> id -> Unix.file_descr option -> Statx.t -> Sketch.ptr -> int -> int -> job = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] + external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> job = "ocaml_uring_submit_splice" [@@noalloc] + external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_bind" [@@noalloc] + external submit_listen : t -> id -> Unix.file_descr -> int -> job = "ocaml_uring_submit_listen" [@@noalloc] + external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_connect" [@@noalloc] + external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_accept" [@@noalloc] + external submit_cancel : t -> id -> job -> job = "ocaml_uring_submit_cancel" [@@noalloc] + external submit_openat2 : t -> id -> Unix.file_descr option -> Open_how.t -> job = "ocaml_uring_submit_openat2" [@@noalloc] + external submit_linkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] + external submit_unlinkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> bool -> job = "ocaml_uring_submit_unlinkat" [@@noalloc] + external submit_mkdirat : t -> id -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_mkdirat" [@@noalloc] + external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_send_msg" [@@noalloc] + external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_recv_msg" [@@noalloc] + external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fsync" [@@noalloc] + external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fdatasync" [@@noalloc] type cqe_option = private | Cqe_none - | Cqe_some of { user_data_id : id; res: int } - [@@ocaml.warning "-37" (* Avoids "Unused constructor" warning on OCaml <= 4.09. *)] + | Cqe_unit of { user_data_id : id; result: unit } + | Cqe_int of { user_data_id : id; result: int } + | Cqe_fd of { user_data_id : id; result: Unix.file_descr } + | Cqe_error of { user_data_id : id; result: Unix.error } external wait_cqe : t -> cqe_option = "ocaml_uring_wait_cqe" external wait_cqe_timeout : float -> t -> cqe_option = "ocaml_uring_wait_cqe_timeout" external peek_cqe : t -> cqe_option = "ocaml_uring_peek_cqe" - external completion_of_result : 'a -> int -> 'a completion_option = "ocaml_uring_completion_of_result" external error_of_errno : int -> Unix.error = "ocaml_uring_error_of_errno" external register_eventfd : t -> Unix.file_descr -> unit = "ocaml_uring_register_eventfd" end +type 'a job = 'a Heap.entry * Uring.job + type 'a t = { id : < >; uring: Uring.t; @@ -449,32 +442,31 @@ let exit t = Uring.exit t.uring; unregister_gc_root t -let with_id_full : type a. a t -> (Heap.ptr -> bool) -> Heap.result_kind -> a -> extra_data:'b -> a job option = - fun t fn result_kind datum ~extra_data -> - match Heap.alloc t.data result_kind datum ~extra_data with - | exception (Invalid_argument _ as ex) -> check t; raise ex - | entry -> - let ptr = Heap.ptr entry in - let has_space = fn ptr in - if has_space then - Some entry - else ( - ignore (Heap.free t.data ptr : a * Heap.result_kind); - None - ) - -let with_id t fn result_kind a = with_id_full t fn result_kind a ~extra_data:() +let with_id_full : type a. a t -> (Heap.ptr -> Uring.job) -> a -> extra_data:'b -> a job option = + fun t fn datum ~extra_data -> + match Heap.alloc t.data datum ~extra_data with + | exception (Invalid_argument _ as ex) -> check t; raise ex + | entry -> + let ptr = Heap.ptr entry in + let job = fn ptr in + if (job :> int) >= 0 then + Some (entry, job) + else ( + ignore (Heap.free t.data ptr : a); + None + ) + +let with_id t fn a = with_id_full t fn a ~extra_data:() let noop t user_data = - with_id t (fun id -> Uring.submit_nop t.uring id) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_nop t.uring id) user_data external set_timespec: Sketch.ptr -> int64 -> unit = "ocaml_uring_set_timespec" [@@noalloc] let timeout ?(absolute = false) t clock timeout_ns user_data = let timespec_ptr = Sketch.alloc t.sketch Config.sizeof_kernel_timespec in set_timespec timespec_ptr timeout_ns; -(* XXX Kind_Error? *) - with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) user_data let openat2 t ~access ~flags ~perm ~resolve ?fd path user_data = let open_flags = flags lor match access with @@ -483,7 +475,7 @@ let openat2 t ~access ~flags ~perm ~resolve ?fd path user_data = | `RW -> Open_flags.rdwr in let open_how = Open_how.v ~open_flags ~perm ~resolve path in - with_id_full t (fun id -> Uring.submit_openat2 t.uring id fd open_how) Kind_FD user_data ~extra_data:open_how + with_id_full t (fun id -> Uring.submit_openat2 t.uring id fd open_how) user_data ~extra_data:open_how module Linkat_flags = struct include Flags @@ -496,83 +488,80 @@ let linkat t ?old_dir_fd ?new_dir_fd ~flags ~old_path ~new_path user_data = let old_path_buf = Sketch.String.alloc t.sketch old_path in let new_path_buf = Sketch.String.alloc t.sketch new_path in Uring.submit_linkat t.uring id old_dir_fd old_path_buf new_dir_fd new_path_buf flags - (* XXX Kind_Error? *) - ) Heap.Kind_Int user_data + ) user_data let unlink t ~dir ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_unlinkat t.uring id fd buf dir - (* XXX Kind_Error? *) - ) Heap.Kind_Int user_data + ) user_data let mkdirat t ~mode ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_mkdirat t.uring id fd buf mode - (* XXX Kind_Error? *) - ) Heap.Kind_Int user_data + ) user_data let read t ~file_offset fd (buf : Cstruct.t) user_data = - with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) Heap.Kind_Int user_data ~extra_data:buf + with_id_full t (fun id -> Uring.submit_read t.uring fd id buf file_offset) user_data ~extra_data:buf let write t ~file_offset fd (buf : Cstruct.t) user_data = - with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) Heap.Kind_Int user_data ~extra_data:buf + with_id_full t (fun id -> Uring.submit_write t.uring fd id buf file_offset) user_data ~extra_data:buf let iov_max = Config.iov_max let readv t ~file_offset fd buffers user_data = with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_readv t.uring fd id iovec file_offset) Heap.Kind_Int user_data ~extra_data:buffers + Uring.submit_readv t.uring fd id iovec file_offset) user_data ~extra_data:buffers let read_fixed t ~file_offset fd ~off ~len user_data = - with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let read_chunk ?len t ~file_offset fd chunk user_data = let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; - with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_readv_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let write_fixed t ~file_offset fd ~off ~len user_data = - with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let write_chunk ?len t ~file_offset fd chunk user_data = let { Cstruct.buffer; off; len } = Region.to_cstruct ?len chunk in if buffer != t.fixed_iobuf then invalid_arg "Chunk does not belong to ring!"; - with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_writev_fixed t.uring fd id t.fixed_iobuf off len file_offset) user_data let writev t ~file_offset fd buffers user_data = with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_writev t.uring fd id iovec file_offset) Heap.Kind_Int user_data ~extra_data:buffers + Uring.submit_writev t.uring fd id iovec file_offset) user_data ~extra_data:buffers let poll_add t fd poll_mask user_data = - with_id t (fun id -> Uring.submit_poll_add t.uring fd id poll_mask) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_poll_add t.uring fd id poll_mask) user_data let close t fd user_data = - with_id t (fun id -> Uring.submit_close t.uring fd id) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_close t.uring fd id) user_data let statx t ?fd ~mask path statx flags user_data = let spath = Sketch.String.alloc t.sketch path in - with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) Heap.Kind_Int user_data ~extra_data:statx + with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) user_data ~extra_data:statx let splice t ~src ~dst ~len user_data = - with_id t (fun id -> Uring.submit_splice t.uring id src dst len) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_splice t.uring id src dst len) user_data let bind t fd addr user_data = let addr = Sockaddr.of_unix addr in - with_id_full t (fun id -> Uring.submit_bind t.uring id fd addr) Heap.Kind_Int user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_bind t.uring id fd addr) user_data ~extra_data:addr let listen t fd backlog user_data = - with_id t (fun id -> Uring.submit_listen t.uring id fd backlog) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_listen t.uring id fd backlog) user_data let connect t fd addr user_data = let addr = Sockaddr.of_unix addr in - with_id_full t (fun id -> Uring.submit_connect t.uring id fd addr) Heap.Kind_Int user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_connect t.uring id fd addr) user_data ~extra_data:addr let accept t fd addr user_data = - with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) Heap.Kind_FD user_data ~extra_data:addr + with_id_full t (fun id -> Uring.submit_accept t.uring id fd addr) user_data ~extra_data:addr let send_msg ?(fds=[]) ?dst t fd buffers user_data = let addr = Option.map Sockaddr.of_unix dst in @@ -581,24 +570,24 @@ let send_msg ?(fds=[]) ?dst t fd buffers user_data = (* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *) with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_send_msg t.uring id fd msghdr iovec) Heap.Kind_Int user_data ~extra_data:msghdr + Uring.submit_send_msg t.uring id fd msghdr iovec) user_data ~extra_data:msghdr let recv_msg t fd msghdr user_data = let _, _, buffers = msghdr in (* NOTE: `msghdr` references `buffers`, so it's enough for `extra_data` *) with_id_full t (fun id -> let iovec = Sketch.Iovec.alloc t.sketch buffers in - Uring.submit_recv_msg t.uring id fd msghdr iovec) Heap.Kind_Int user_data ~extra_data:msghdr + Uring.submit_recv_msg t.uring id fd msghdr iovec) user_data ~extra_data:msghdr let fsync t ?(off=0L) ?(len=0) fd user_data = - with_id t (fun id -> Uring.submit_fsync t.uring id fd off len) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_fsync t.uring id fd off len) user_data let fdatasync t ?(off=0L) ?(len=0) fd user_data = - with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) Heap.Kind_Int user_data + with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) user_data -let cancel t job user_data = - ignore (Heap.ptr job : Uring.id); (* Check it's still valid *) - with_id t (fun id -> Uring.submit_cancel t.uring id (Heap.ptr job)) Heap.Kind_Int user_data +let cancel t (entry, job) user_data = + ignore (Heap.ptr entry : Uring.id); (* Check it's still valid *) + with_id t (fun id -> Uring.submit_cancel t.uring id job) user_data let sqe_ready t = Uring.sq_ready t.uring @@ -623,16 +612,28 @@ let submit t = gc_sketch t; v +type 'a completion_option = + | None + | Unit of { result: unit; data: 'a } + | Int of { result: int; data: 'a } + | FD of { result: Unix.file_descr; data: 'a } + | Error of { result: Unix.error; data: 'a } + let fn_on_ring fn t = match fn t.uring with | Uring.Cqe_none -> None - | Uring.Cqe_some { user_data_id; res } -> - let (data, kind) = Heap.free t.data user_data_id in - match kind with - | Heap.Kind_FD -> - Uring.completion_of_result data res - | Heap.Kind_Int -> - Some { result = res; kind = Int; data } + | Uring.Cqe_unit { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + Unit { result; data } + | Uring.Cqe_int { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + Int { result; data } + | Uring.Cqe_fd { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + FD { result; data } + | Uring.Cqe_error { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + Error { result; data } let get_cqe_nonblocking t = check t; diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 70b792e3..8a963f1a 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -840,18 +840,16 @@ val submit : 'a t -> int to the kernel. Their results can subsequently be retrieved using {!wait} or {!peek}. *) -type _ return_kind = - | FD : Unix.file_descr return_kind - | Error : Unix.error return_kind - | Int : int return_kind - type 'a completion_option = - | None : 'a completion_option - | Some : { result: 'b; kind: 'b return_kind; data: 'a } -> 'a completion_option (**) + | None + | Unit of { result: unit; data: 'a } + | Int of { result: int; data: 'a } + | FD of { result: Unix.file_descr; data: 'a } + | Error of { result: Unix.error; data: 'a } (**) (** The type of results of calling {!wait} and {!peek}. [None] denotes that either there were no completions in the queue or an interrupt / timeout - occurred. [Some] contains both the user data attached to the completed - request and the integer syscall result. *) + occurred. The other constructors contain both the user data attached to the + completed request and syscall result. *) val wait : ?timeout:float -> 'a t -> 'a completion_option (** [wait ?timeout t] will block indefinitely (the default) or for [timeout] diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index d655c3e0..baa7d7f9 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -145,14 +145,21 @@ value ocaml_uring_exit(value v_uring) { CAMLreturn(Val_unit); } +#define Data_val(v_id, kind) ((void *)(Long_val(v_id) | ((intnat)kind << (8 * sizeof(value) - 4)))) +#define DATA_MASK (~((intnat)((intnat)3 << (8 * sizeof(value) - 4)))) +#define RES_UNIT 0 +#define RES_INT 1 +#define RES_FD 2 + value /* noalloc */ ocaml_uring_submit_nop(value v_uring, value v_id) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -193,10 +200,11 @@ ocaml_uring_submit_timeout(value v_uring, value v_id, value v_sketch_ptr, value flags |= IORING_TIMEOUT_ABS; sqe = io_uring_get_sqe(ring); - if (!sqe) return Val_false; + if (!sqe) return Val_long(-1); io_uring_prep_timeout(sqe, t, 0, flags); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return Val_true; + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return Val_long((intnat)id); } struct open_how_data { @@ -255,22 +263,24 @@ value /* noalloc */ ocaml_uring_submit_openat2(value v_uring, value v_id, value v_fd, value v_open_how) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); struct open_how_data *data = Open_how_val(v_open_how); io_uring_prep_openat2(sqe, with_at_fdcwd(v_fd), data->path, &data->how); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_FD); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ ocaml_uring_submit_close(value v_uring, value v_fd, value v_id) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_close: fd:%d\n", Int_val(v_fd)); io_uring_prep_close(sqe, Int_val(v_fd)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -278,11 +288,12 @@ ocaml_uring_submit_poll_add(value v_uring, value v_fd, value v_id, value v_poll_ int poll_mask = Int_val(v_poll_mask); struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_poll_add: fd:%d mask:%x\n", Int_val(v_fd), poll_mask); io_uring_prep_poll_add(sqe, Int_val(v_fd), poll_mask); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -314,11 +325,12 @@ ocaml_uring_submit_readv(value v_uring, value v_fd, value v_id, value v_sketch_p size_t len = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(*iovs); if (sqe == NULL) - return (Val_false); + return (Val_long(-1)); dprintf("submit_readv: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int63_val(v_fileoff)); io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // Caller must ensure the buffers pointed to by v_sketch_ptr are not GC'd until the job is finished. @@ -330,11 +342,12 @@ ocaml_uring_submit_writev(value v_uring, value v_fd, value v_id, value v_sketch_ size_t len = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(*iovs); if (sqe == NULL) - return (Val_false); + return (Val_long(-1)); dprintf("submit_writev: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int63_val(v_fileoff)); io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // Caller must ensure the buffers are not released until this job completes. @@ -343,11 +356,12 @@ ocaml_uring_submit_readv_fixed_native(value v_uring, value v_fd, value v_id, val struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_readv_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); io_uring_prep_read_fixed(sqe, Int_val(v_fd), buf, Int_val(v_len), Int63_val(v_fileoff), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -369,11 +383,12 @@ ocaml_uring_submit_writev_fixed_native(value v_uring, value v_fd, value v_id, va struct io_uring_sqe *sqe = io_uring_get_sqe(ring); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); if (!sqe) - return (Val_false); + return (Val_long(-1)); dprintf("submit_writev_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); io_uring_prep_write_fixed(sqe, Int_val(v_fd), buf, Int_val(v_len), Int63_val(v_fileoff), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -396,12 +411,13 @@ ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_cstruct, value v_off = Field(v_cstruct, 1); value v_len = Field(v_cstruct, 2); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_read: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); io_uring_prep_read(sqe, Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -412,25 +428,27 @@ ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_cstruct, value v_off = Field(v_cstruct, 1); value v_len = Field(v_cstruct, 2); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_write: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); io_uring_prep_write(sqe, Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ ocaml_uring_submit_splice(value v_uring, value v_id, value v_fd_in, value v_fd_out, value v_nbytes) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_splice(sqe, Int_val(v_fd_in), (int64_t) -1, Int_val(v_fd_out), (int64_t) -1, Int_val(v_nbytes), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } #define Statx_val(v) (*((struct statx **) Data_custom_val(v))) @@ -489,11 +507,12 @@ value ocaml_uring_submit_statx_native(value v_uring, value v_id, value v_fd, value v_statx, value v_sketch_ptr, value v_flags, value v_mask) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); char *path = Sketch_ptr_val(v_sketch_ptr); io_uring_prep_statx(sqe, with_at_fdcwd(v_fd), path, Int_val(v_flags), Int_val(v_mask), Statx_val(v_statx)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -791,10 +810,11 @@ ocaml_uring_submit_bind(value v_uring, value v_id, value v_fd, value v_sockaddr) struct io_uring_sqe *sqe; struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_bind(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), addr->sock_addr_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -802,10 +822,11 @@ ocaml_uring_submit_listen(value v_uring, value v_id, value v_fd, value v_backlog struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_listen(sqe, Int_val(v_fd), Int_val(v_backlog)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_sockaddr must not be GC'd while the call is in progress @@ -815,10 +836,11 @@ ocaml_uring_submit_connect(value v_uring, value v_id, value v_fd, value v_sockad struct io_uring_sqe *sqe; struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_connect(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), addr->sock_addr_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_msghdr must not be GC'd while the call is in progress @@ -827,13 +849,14 @@ ocaml_uring_submit_send_msg(value v_uring, value v_id, value v_fd, value v_msghd struct io_uring *ring = Ring_val(v_uring); struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (sqe == NULL) return (Val_false); + if (sqe == NULL) return (Val_long(-1)); msg->msg_iov = Sketch_ptr_val(v_sketch_ptr); msg->msg_iovlen = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(struct iovec); dprintf("submit_sendmsg\n"); io_uring_prep_sendmsg(sqe, Int_val(v_fd), msg, 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_msghdr must not be GC'd while the call is in progress @@ -842,13 +865,14 @@ ocaml_uring_submit_recv_msg(value v_uring, value v_id, value v_fd, value v_msghd struct io_uring *ring = Ring_val(v_uring); struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (sqe == NULL) return (Val_false); + if (sqe == NULL) return (Val_long(-1)); msg->msg_iov = Sketch_ptr_val(v_sketch_ptr); msg->msg_iovlen = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(struct iovec); dprintf("submit_recvmsg:msghdr %p: registering iobuf base %p len %lu\n", msg, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len); io_uring_prep_recvmsg(sqe, Int_val(v_fd), msg, 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_sockaddr must not be GC'd while the call is in progress @@ -859,10 +883,11 @@ ocaml_uring_submit_accept(value v_uring, value v_id, value v_fd, value v_sockadd struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); addr->sock_addr_len = sizeof(union sock_addr_union); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_accept(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), &addr->sock_addr_len, SOCK_CLOEXEC); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_FD); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -880,10 +905,11 @@ ocaml_uring_submit_unlinkat(value v_uring, value v_id, value v_fd, value v_sketc struct io_uring_sqe *sqe = io_uring_get_sqe(ring); int flags = Bool_val(v_rmdir) ? AT_REMOVEDIR : 0; char *path = Sketch_ptr_val(v_sketch_ptr); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_unlinkat(sqe, with_at_fdcwd(v_fd), path, flags); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -891,10 +917,11 @@ ocaml_uring_submit_mkdirat(value v_uring, value v_id, value v_fd, value v_sketch struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); char *path = Sketch_ptr_val(v_sketch_ptr); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_mkdirat(sqe, with_at_fdcwd(v_fd), path, Int_val(v_mode)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -902,12 +929,13 @@ ocaml_uring_submit_fsync(value v_uring, value v_id, value v_fd, value v_off, val { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_fsync(sqe, Int_val(v_fd), 0); sqe->off = Int64_val(v_off); sqe->len = Int_val(v_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -915,12 +943,13 @@ ocaml_uring_submit_fdatasync(value v_uring, value v_id, value v_fd, value v_off, { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_fsync(sqe, Int_val(v_fd), IORING_FSYNC_DATASYNC); sqe->off = Int64_val(v_off); sqe->len = Int_val(v_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -928,10 +957,12 @@ ocaml_uring_submit_cancel(value v_uring, value v_id, value v_target) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_cancel(sqe, (void *)Long_val(v_target), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + /* RES_UNIT, since flags == 0 */ + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value ocaml_uring_submit(value v_uring) @@ -946,21 +977,34 @@ value ocaml_uring_submit(value v_uring) #define Val_cqe_none Val_int(0) -static value Val_cqe_some(value id, value res) { - CAMLparam2(id, res); - CAMLlocal1(some); - some = caml_alloc(2, 0); - Store_field(some, 0, id); - Store_field(some, 1, res); +static value cqe_error(intnat id, int res) { + CAMLparam0(); + CAMLlocal1(err); + err = unix_error_of_code(-res); + value some = caml_alloc_small(2, 3); + Field(some, 0) = Val_long(id & DATA_MASK); + Field(some, 1) = err; CAMLreturn(some); } +static value Val_cqe_some(intnat id, int res) { + value some; + if (res < 0) { + some = cqe_error(id, res); + } else { + some = caml_alloc_small(2, (id >> (8 * sizeof(value) - 4))); + Field(some, 1) = Val_int(res); + } + Field(some, 0) = Val_long(id & DATA_MASK); + return some; +} + value ocaml_uring_wait_cqe_timeout(value v_timeout, value v_uring) { CAMLparam2(v_uring, v_timeout); double timeout = Double_val(v_timeout); struct __kernel_timespec t; - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -979,17 +1023,17 @@ value ocaml_uring_wait_cqe_timeout(value v_timeout, value v_uring) } else { if (!cqe) CAMLreturn(Val_cqe_none); - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } value ocaml_uring_wait_cqe(value v_uring) { CAMLparam1(v_uring); - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -1004,17 +1048,17 @@ value ocaml_uring_wait_cqe(value v_uring) unix_error(-res, "io_uring_wait_cqe", Nothing); } } else { - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } value ocaml_uring_peek_cqe(value v_uring) { CAMLparam1(v_uring); - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -1027,10 +1071,10 @@ value ocaml_uring_peek_cqe(value v_uring) unix_error(-res, "io_uring_peek_cqe", Nothing); } } else { - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } @@ -1039,32 +1083,6 @@ value ocaml_uring_error_of_errno(value v_errno) { return unix_error_of_code(Int_val(v_errno)); } -#ifndef CAML_UNIX_FILE_DESCR_API -#ifdef _WIN32 -#error "Unix-specific treatment of Unix.file_descr" -#else -#define caml_unix_file_descr_of_fd(fd) Val_int(fd) -#endif /* #ifdef _WIN32 */ -#endif /* #ifndef CAML_UNIX_FILE_DESCR_API */ - -value ocaml_uring_completion_of_result(value v_data, value v_result) -{ - CAMLparam0(); - CAMLlocal2(result, val); - if (Int_val(v_result) < 0) { - val = unix_error_of_code(-Int_val(v_result)); - result = caml_alloc_small(3, 0); - Field(result, 1) = Val_int(1); - } else { - val = caml_unix_file_descr_of_fd(Int_val(v_result)); - result = caml_alloc_small(3, 0); - Field(result, 1) = Val_int(0); - } - Field(result, 0) = val; - Field(result, 2) = v_data; - CAMLreturn(result); -} - #define Probe_val(v) (*((struct io_uring_probe **) Data_custom_val(v))) static void finalize_probe(value v) { @@ -1129,12 +1147,13 @@ ocaml_uring_submit_linkat_native(value v_uring, value v_id, struct io_uring_sqe *sqe = io_uring_get_sqe(ring); if (!sqe) - return Val_false; + return Val_long(-1); io_uring_prep_linkat(sqe, with_at_fdcwd(v_old_dir), old_path, with_at_fdcwd(v_new_dir), new_path, Int_val(v_flags)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); - return Val_true; + return Val_long((intnat)id); } value diff --git a/tests/heap.md b/tests/heap.md index 01da2885..6cf579ab 100644 --- a/tests/heap.md +++ b/tests/heap.md @@ -35,13 +35,13 @@ Test normal usage: match attempt_alloc with | true -> let data = Random.int 5000 in - let ptr = Heap.ptr (Heap.alloc t Heap.Kind_Int data) in + let ptr = Heap.ptr (Heap.alloc t data) in assert (not (Hashtbl.mem reference ptr)); Hashtbl.add reference ptr data; incr currently_allocated | false -> let (k, v) = random_hashtbl_elt reference in - let v', _ = Heap.free t k in + let v' = Heap.free t k in Hashtbl.remove reference k; assert (v = v'); decr currently_allocated @@ -62,7 +62,7 @@ let shuffle_list l = # let t = Heap.create 0 in let add_l = List.init 1024 (fun i -> i) |> shuffle_list in assert (Heap.in_use t = 0); - let free_l = List.map (fun i -> Heap.alloc t Heap.Kind_Int i |> Heap.ptr) add_l |> + let free_l = List.map (fun i -> Heap.alloc t i |> Heap.ptr) add_l |> shuffle_list in assert (Heap.in_use t = 1024); @@ -77,10 +77,10 @@ Double free in an empty heap: ```ocaml # let t : int Heap.t = Heap.create 1;; val t : int Heap.t = -# let p = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 1;; +# let p = Heap.ptr @@ Heap.alloc t 1;; val p : Heap.ptr = 0 # Heap.free t p;; -- : int * Heap.result_kind = (1, Heap.Kind_Int) +- : int = 1 # Heap.free t p;; Exception: Invalid_argument "Heap.free: pointer already freed". # let t : unit = Heap.release t;; @@ -92,16 +92,16 @@ Double free in a non-empty heap: ```ocaml # let t : int Heap.t = Heap.create 2;;; val t : int Heap.t = -# let p1 = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 1;;; +# let p1 = Heap.ptr @@ Heap.alloc t 1;;; val p1 : Heap.ptr = 0 -# let p2 = Heap.ptr @@ Heap.alloc t Heap.Kind_Int 2;;; +# let p2 = Heap.ptr @@ Heap.alloc t 2;;; val p2 : Heap.ptr = 1 # Heap.free t p1;; -- : int * Heap.result_kind = (1, Heap.Kind_Int) +- : int = 1 # Heap.free t p1;; Exception: Invalid_argument "Heap.free: pointer already freed". # Heap.free t p2;; -- : int * Heap.result_kind = (2, Heap.Kind_Int) +- : int = 2 # let t : unit = Heap.release t;; val t : unit = () ``` diff --git a/tests/main.md b/tests/main.md index 9ac797a4..c165e019 100644 --- a/tests/main.md +++ b/tests/main.md @@ -16,28 +16,27 @@ module Test_data = struct close_out oc end -let rec consume_exn name path t = - match Uring.wait ~timeout:1. t with - | None -> consume_exn name path t - | Some { data; kind = Uring.Error; result } -> - raise (Unix.Unix_error ((result : Unix.error), name, path)) - | Some { data; kind = Uring.FD; result } -> data, (result : Unix.file_descr) - | Some _ -> assert false - let rec consume t = match Uring.wait ~timeout:1. t with - | Some { data; kind = Uring.Int; result } -> (data, (result : int)) - | Some _ -> - assert false + | Int { data; result } -> (data, result) + | Unit { data; result = _ } -> (data, 0) + | FD _ | Error _ -> failwith "Unexpected return from syscall" | None -> consume t -let rec consume_fd t = +let rec consume_result t = + match Uring.wait ~timeout:1. t with + | Error { data; result } -> (data, Error result) + | Unit { data; result = () } -> (data, Ok 0) + | Int { data; result } -> (data, Ok result) + | FD _ -> failwith "Unexpected return from syscall" + | None -> consume_result t + +let rec consume_fd path t = match Uring.wait ~timeout:1. t with - | None -> consume_fd t - | Some { data; kind = Uring.FD; result } -> - data, (result : Unix.file_descr) - | Some _ -> - assert false + | FD { data; result } -> data, result + | Error { data = _; result } -> raise (Unix.Unix_error (result, "openat2", path)) + | Int _ | Unit _ -> failwith "Unexpected return from syscall" + | None -> consume_fd path t let traceln fmt = Format.printf (fmt ^^ "@.") @@ -152,7 +151,7 @@ val t : [ `Open ] Uring.t = # Uring.submit t;;; - : int = 1 -# let token, fd = consume_fd t;; +# let token, fd = consume_fd "/dev/null" t;; val token : [ `Open ] = `Open val fd : Unix.file_descr = @@ -185,7 +184,7 @@ val t : [ `Create ] Uring.t = # Uring.submit t;; - : int = 1 -# let token, fd = consume_fd t;; +# let token, fd = consume_fd "test-openat" t;; val token : [ `Create ] = `Create val fd : Unix.file_descr = @@ -264,7 +263,7 @@ Now using `~fd`: # Uring.submit t;; - : int = 1 -# let token, fd = consume_fd t;; +# let token, fd = consume_fd "test-openat" t;; val token : [ `Open_path | `Statx ] = `Open_path val fd : Unix.file_descr = @@ -310,7 +309,7 @@ val t : [ `Get_path ] Uring.t = path `Get_path)); traceln "Submitted %d" (Uring.submit t); - let `Get_path, fd = consume_exn "openat2" path t in + let `Get_path, fd = consume_fd path t in Unix.close fd; traceln "Opened %S OK" path;; val get : resolve:Uring.Resolve.t -> string -> unit = @@ -556,8 +555,8 @@ val fd : unit = () Ask to read from a pipe (with no data available), then cancel it. ```ocaml -# exception Multiple of Unix.error list;; -exception Multiple of Unix.error list +# exception Multiple of (int, Unix.error) result list;; +exception Multiple of (int, Unix.error) result list # let t : [ `Cancel | `Read ] Uring.t = Uring.create ~queue_depth:5 ();; val t : [ `Cancel | `Read ] Uring.t = @@ -575,19 +574,19 @@ val read : [ `Cancel | `Read ] Uring.job = - : [ `Cancel | `Read ] Uring.job option = Some # Uring.submit t;; - : int = 2 -# let t1, r1 = consume t in - let t2, r2 = consume t in +# let t1, r1 = consume_result t in + let t2, r2 = consume_result t in let r_read, r_cancel = match t1, t2 with | `Read, `Cancel -> r1, r2 | `Cancel, `Read -> r2, r1 | _ -> assert false in - begin match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EINTR, EALREADY + begin match r_read, r_cancel with + | Error EINTR, Error EALREADY (* Occasionally, the read is actually busy just as we try to cancel. In that case it gets interrupted and the cancel returns EALREADY. *) - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> + | Error (EUNKNOWNERR 125) (* ECANCELLED *), Ok 0 -> (* This is the common case. The read is blocked and can just be removed. *) () | e1, e2 -> raise (Multiple [e1; e2]) @@ -621,21 +620,22 @@ val read : [ `Cancel | `Read ] Uring.job = - : [ `Cancel | `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 -# let t1, r1 = consume t in - let t2, r2 = consume t in +# let t1, r1 = consume_result t in + let t2, r2 = consume_result t in let r_read, r_cancel = match t1, t2 with | `Read, `Cancel -> r1, r2 | `Cancel, `Read -> r2, r1 | _ -> assert false in - if r_read = 1 then ( - match Uring.error_of_errno r_cancel with - | ENOENT -> () - | e -> raise (Unix.Unix_error (e, "cancel", "")) + if r_read = Ok 1 then ( + match r_cancel with + | Ok _ + | Error ENOENT -> () + | Error e -> raise (Unix.Unix_error (e, "cancel", "")) ) else ( - match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> + match r_read, r_cancel with + | Error (EUNKNOWNERR 125) (* ECANCELLED *), Ok 0 -> (* This isn't the case we want to test, but it can happen sometimes. *) () | e1, e2 -> raise (Multiple [e1; e2]) @@ -818,12 +818,10 @@ val check : unit -> bool * bool = - : unit Uring.job option = Some # Uring.wait t;; -- : unit Uring.completion_option = -Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # Uring.wait t;; -- : unit Uring.completion_option = -Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # check ();; - : bool * bool = (false, false) @@ -846,8 +844,7 @@ val t : unit Uring.t = # Uring.submit t;; - : int = 1 # Uring.wait t;; -- : unit Uring.completion_option = -Uring.Some {Uring.result = ; kind = Uring.Int; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # (Unix.lstat "new-symlink").st_kind;; - : Unix.file_kind = Unix.S_LNK ``` @@ -859,7 +856,7 @@ This currently doesn't work due to https://github.com/axboe/liburing/issues/955: # Uring.submit t;; - : int = 1 # Uring.wait t;; - - : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} + - : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # (Unix.lstat "new-file").st_kind;; - : Unix.file_kind = Unix.S_REG @@ -883,7 +880,7 @@ val t : [ `Mkdir of int ] Uring.t = - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = ; kind = Uring.Int; data = `Mkdir 0} +Uring.Unit {Uring.result = (); data = `Mkdir 0} # Printf.sprintf "0o%o" ((Unix.stat "mkdir").st_perm land 0o777);; - : string = "0o700" # let v = Uring.mkdirat t ~mode:0o755 "mkdir" (`Mkdir 1);; @@ -892,7 +889,7 @@ val v : [ `Mkdir of int ] Uring.job option = Some - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = ; kind = Uring.Int; data = `Mkdir 1} +Uring.Error {Uring.result = Unix.EEXIST; data = `Mkdir 1} # Uring.exit t;; - : unit = () ``` @@ -912,8 +909,8 @@ val t : [ `Timeout ] Uring.t = # Uring.submit t;; - : int = 1 -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # let ns = ((Unix.gettimeofday () +. 0.01) *. 1e9) @@ -922,15 +919,15 @@ val timeout : int = -62 Uring.(timeout ~absolute:true t Realtime ns `Timeout);; - : [ `Timeout ] Uring.job option = Some -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # let ns1 = Int64.(mul 10L 1_000_000L) in Uring.(timeout ~absolute:true t Boottime ns1 `Timeout);; - : [ `Timeout ] Uring.job option = Some -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # Uring.exit t;; - : unit = () diff --git a/tests/poll_add.ml b/tests/poll_add.ml index 030b897c..b589c880 100644 --- a/tests/poll_add.ml +++ b/tests/poll_add.ml @@ -13,8 +13,8 @@ let () = let rec retry () = match Uring.wait t with | None -> retry () - | Some { result; kind = Uring.Int; _ } -> (result : int) - | Some _ -> assert false + | Int { result; _ } -> result + | FD _ | Error _ | Unit _ -> failwith "Unexpected return from poll" in let res = retry () in Printf.eprintf "poll_add: %x\n%!" res; diff --git a/tests/sketch.md b/tests/sketch.md index 01735337..36312653 100644 --- a/tests/sketch.md +++ b/tests/sketch.md @@ -11,8 +11,8 @@ let ldup n x = List.init n (Fun.const x) let rec consume t = match Uring.wait t with - | Some { data; kind = Uring.Int; result } -> (data, (result : int)) - | Some _ -> assert false + | Int { data; result } -> (data, result) + | FD _ | Unit _ | Error _ -> failwith "Unexpected return from readv" | None -> consume t ``` diff --git a/tests/socket_ops.ml b/tests/socket_ops.ml index 994233e5..a9541aef 100644 --- a/tests/socket_ops.ml +++ b/tests/socket_ops.ml @@ -13,49 +13,43 @@ let () = let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, 0) in (* Use io_uring for bind operation *) - let bind_result = + let () = match Uring.bind t server_sock addr () with | None -> failwith "Failed to submit bind operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for bind" - | Uring.Some { result; kind = Uring.Int; data = _ } -> - let result = (result : int) in - if result < 0 then begin - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Bind failed: %s" (Unix.error_message err)) - end else - result - | Uring.Some _ -> assert false + | Uring.Unit _ -> + print_endline "Bind completed successfully:" + | Uring.Error { result; data = _ } -> + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Bind failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from bind operation" in - printf "Bind completed with result: %d\n" bind_result; (* Use io_uring for listen operation *) let backlog = 10 in - let listen_result = + let () = match Uring.listen t server_sock backlog () with | None -> failwith "Failed to submit listen operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for listen" - | Uring.Some { result; kind = Uring.Int; data = _ } -> - let result = (result : int) in - if result < 0 then begin - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Listen failed: %s" (Unix.error_message err)) - end else - result - | Uring.Some _ -> assert false + | Uring.Unit _ -> + print_endline "Listen completed successfully" + | Uring.Error { result; data = _ } -> + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Listen failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from listen operation" in - printf "Listen completed with result: %d\n" listen_result; (* Get the actual bound port - Unix.getsockname is necessary for socket introspection *) let actual_addr = Unix.getsockname server_sock in @@ -75,32 +69,28 @@ let () = (* Use io_uring for connect operation *) let connect_addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in - let connect_result = + let () = match Uring.connect t client_sock connect_addr () with | None -> failwith "Failed to submit connect operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for connect" - | Uring.Some { result; kind = Uring.Int; data = _ } -> - let result = (result : int) in + | Uring.Unit _ -> + print_endline "Connect initiated successfully\n" + | Uring.Error { result = Unix.EINPROGRESS; data = _ } -> (* Connect may return -EINPROGRESS for non-blocking sockets, which is normal *) - if result < 0 && result <> (-115) (* -EINPROGRESS *) then begin - Uring.close t client_sock () |> ignore; - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Connect failed: %s (errno: %d)" (Unix.error_message err) (-result)) - end else - result - | Uring.Some _ -> assert false + print_endline "Connect initiated successfully (result: EINPROGRESS)" + | Uring.Error { result; data = _ } -> + Uring.close t client_sock () |> ignore; + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Connect failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from connect operation" in - if connect_result = 0 || connect_result = (-115) then - printf "Connect initiated successfully (result: %d)\n" connect_result - else - printf "Connect completed with result: %d\n" connect_result; (* Get the client socket's local port - Unix.getsockname is necessary for socket introspection *) let client_addr = Unix.getsockname client_sock in @@ -128,11 +118,13 @@ let () = if pending > 0 then match Uring.wait t with | Uring.None -> failwith "No completion for close" - | Uring.Some { result; kind = Uring.Int; data = _ } -> - if result < 0 then - printf "Close warning: %s\n" (Unix.error_message (Uring.error_of_errno (-result))); + | Uring.Unit _ -> wait_closes (pending - 1) - | Uring.Some _ -> assert false + | Uring.Error { result; data = _ } -> + printf "Close warning: %s\n" (Unix.error_message result); + wait_closes (pending - 1) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from close operation" in wait_closes 2; diff --git a/tests/urcat.ml b/tests/urcat.ml index 58291833..1f5e4328 100644 --- a/tests/urcat.ml +++ b/tests/urcat.ml @@ -11,8 +11,8 @@ let get_file_size fd = let get_completion_and_print uring = let iov, len = match Uring.wait uring with - | Some { data; kind = Uring.Int; result } -> (data, (result : int)) - | Some _ -> assert false + | Int { data; result } -> (data, result) + | FD _ | Error _ | Unit _ -> failwith "Unexpected return from read" | None -> failwith "retry" in let remaining = ref len in diff --git a/tests/urcp_fixed_lib.ml b/tests/urcp_fixed_lib.ml index f1cd4fba..a5379504 100644 --- a/tests/urcp_fixed_lib.ml +++ b/tests/urcp_fixed_lib.ml @@ -49,10 +49,6 @@ let queue_read uring t len = t.read_left <- t.read_left - len; t.reads <- t.reads + 1 -(* TODO compile time check *) -let eagain = -11 -let eintr = -4 - (* Check that a read has completely finished, and if not * queue it up for completing the remaining amount *) let handle_read_completion uring req res = @@ -61,13 +57,6 @@ let handle_read_completion uring req res = match res with | 0 -> Logs.debug (fun l -> l "eof %a" pp_req req); - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.read_fixed ~file_offset:req.fileoff uring req.t.infd ~off:req.fixed_off ~len:req.len req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> - raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read *) req.off <- req.off + n; @@ -90,12 +79,6 @@ let handle_write_completion uring req res = let bytes_to_write = req.len - req.off in match res with | 0 -> raise End_of_file - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.write_fixed ~file_offset:req.fileoff uring req.t.outfd ~off:req.fixed_off ~len:req.len req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> failwith (Fmt.str "unix error %d" (-n)) | n when n < bytes_to_write -> (* handle short write *) req.off <- req.off + n; @@ -115,6 +98,19 @@ let handle_completion uring req res = |`R -> handle_read_completion uring req res |`W -> handle_write_completion uring req res +let handle_retry uring req = + match req.op with + |`R -> + (* requeue the request *) + let r = Uring.read_fixed ~file_offset:req.fileoff uring req.t.infd ~off:req.fixed_off ~len:req.len req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + |`W -> + (* requeue the request *) + let r = Uring.write_fixed ~file_offset:req.fileoff uring req.t.outfd ~off:req.fixed_off ~len:req.len req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + let copy_file uring t = (* Create a set of read requests that we will turn into write requests * up until the queue depth *) @@ -138,12 +134,15 @@ let copy_file uring t = let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - | Some { data; kind = Int; result } -> - handle_completion uring data result; - got_completion := true; - handle_completions (); - | Some _ -> - assert false + | Int { data; result } -> + handle_completion uring data result; + got_completion := true; + handle_completions () + | Error { data; result = (Unix.EAGAIN | Unix.EINTR) } -> + handle_retry uring data + | Error { data = _; result } -> + failwith ("Unexpected error: " ^ Unix.error_message result) + | FD _ | Unit _ -> failwith "Unexpected return from syscall" end in handle_completions (); diff --git a/tests/urcp_lib.ml b/tests/urcp_lib.ml index 2d76a309..2b5f2118 100644 --- a/tests/urcp_lib.ml +++ b/tests/urcp_lib.ml @@ -54,10 +54,6 @@ let queue_read uring t len = t.read_left <- t.read_left - len; t.reads <- t.reads + 1 -(* TODO compile time check *) -let eagain = -11 -let eintr = -4 - (* Check that a read has completely finished, and if not * queue it up for completing the remaining amount *) let handle_read_completion uring req res = @@ -66,13 +62,6 @@ let handle_read_completion uring req res = match res with | 0 -> Logs.debug (fun l -> l "eof %a" pp_req req); - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.readv ~file_offset:req.fileoff uring req.t.infd req.iov.next req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> - raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read so new iovec and resubmit *) req.iov.next <- Cstruct.shiftv req.iov.next n; @@ -97,11 +86,6 @@ let handle_write_completion uring req res = let bytes_to_write = req.len - req.off in match res with | 0 -> raise End_of_file - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); | n when n < bytes_to_write -> (* handle short write so new iovec and resubmit *) req.iov.next <- Cstruct.shiftv req.iov.next n; @@ -120,6 +104,19 @@ let handle_completion uring req res = |`R -> handle_read_completion uring req res |`W -> handle_write_completion uring req res +let handle_retry uring req = + match req.op with + |`R -> + (* requeue the request *) + let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + |`W -> + (* requeue the request *) + let r = Uring.readv ~file_offset:req.fileoff uring req.t.infd req.iov.next req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + let copy_file uring t = (* Create a set of read requests that we will turn into write requests * up until the queue depth *) @@ -143,12 +140,15 @@ let copy_file uring t = let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - | Some { data; kind = Int; result } -> - handle_completion uring data result; - got_completion := true; - handle_completions () - | Some _ -> - assert false + | Int { data; result } -> + handle_completion uring data result; + got_completion := true; + handle_completions () + | Error { data; result = (Unix.EAGAIN | Unix.EINTR) } -> + handle_retry uring data + | Error { data = _; result } -> + failwith ("Unexpected error: " ^ Unix.error_message result) + | FD _ | Unit _ -> failwith "Unexpected return from syscall" end in handle_completions (); diff --git a/tests/urstat.ml b/tests/urstat.ml index 8e269516..bff67fff 100644 --- a/tests/urstat.ml +++ b/tests/urstat.ml @@ -11,9 +11,10 @@ let pp_time f (sec, nsec) = nsec let get_completion_and_print uring = - let fname, buf = + let (fname, buf), _ = match Uring.wait uring with - | Some { data; _ } -> data + | Unit { data; result } -> (data, result) + | Int _ | FD _ | Error _ -> failwith "Unexpected return from statx" | None -> failwith "retry" in let kind = S.kind buf in