diff --git a/README.md b/README.md index 2888173c..03aba97b 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,16 @@ We can now ask Linux to suspend the process until a result (Completion Queue Ent let rec wait_with_retry uring = match Uring.wait uring with | None -> wait_with_retry uring (* Interrupted *) - | Some { result; data } -> result, data;; + | Unit { result = (); data } -> Either.Left 0, data + | Int { result; data } -> Either.Left result, data + | FD { result; data } -> Either.Right result, data + | Error { result; data } -> failwith ("Unexpected error: " ^ Unix.error_message result);; ``` ```ocaml # let result, data = wait_with_retry uring;; -val result : int = 8 +val result : (int, Unix.file_descr) Either.t = Right val data : _[> `Open_log ] = `Open_log ``` @@ -76,9 +79,7 @@ The `result` field is the return code, with the same meaning as the return code from the corresponding system call (`openat2` in this case). ```ocaml -# let fd = - if result < 0 then failwith ("Error: " ^ string_of_int result); - (Obj.magic result : Unix.file_descr);; +# let fd = Either.fold ~left:(fun _ -> assert false) ~right:Fun.id result val fd : Unix.file_descr = ``` @@ -97,11 +98,12 @@ let rec write_all fd = function |> Option.get (* We know we have enough space here *) in assert (Uring.submit uring = 1); - let result, data = wait_with_retry uring in - assert (data = `Write_all); (* There aren't any other requests pending *) - assert (result > 0); (* Check for error return *) - let bufs = Cstruct.shiftv bufs result in - write_all fd bufs + match wait_with_retry uring with + | Either.Left result, `Write_all -> + let bufs = Cstruct.shiftv bufs result in + write_all fd bufs + | _ -> + assert false (* There aren't any other requests pending *) ``` ```ocaml @@ -124,8 +126,9 @@ Some - : int = 1 # wait_with_retry uring;; -- : int * ([> `Close_log | `Open_log | `Write_all ] as '_weak3) = -(0, `Close_log) +- : (int, Unix.file_descr) Either.t * + ([> `Close_log | `Open_log | `Write_all ] as '_weak3) += (Either.Left 0, `Close_log) ``` The file has now been written: diff --git a/bench/readv.ml b/bench/readv.ml index 8f977c7d..7a493ddc 100644 --- a/bench/readv.ml +++ b/bench/readv.ml @@ -5,13 +5,14 @@ let buffer_size = 100 (* Use a small buffer to stress the system more *) let n_concurrent = 16 (* How many requests to have active at once *) let n_iters = 1_000_000 (* How many times to accept and resubmit *) -let rec wait t handle = - match Uring.get_cqe_nonblocking t with - | Some { result; data = buf } -> handle result buf - | None -> - match Uring.wait t with - | None -> wait t handle - | Some { result; data = buf } -> handle result buf +let rec process wait t handle = function + | Uring.Int { result; data = buf } -> handle result buf + | Error { result; data = _ } -> + raise (Unix.Unix_error (result, "readv", "")) + | FD _ | Unit _ -> failwith "Unexpected return from readv" + | None -> process wait t handle (wait t) + +let wait t handle = process Uring.wait t handle (Uring.get_cqe_nonblocking t) let run_bechmark ~polling_timeout fd = let got = ref 0 in @@ -29,13 +30,9 @@ let run_bechmark ~polling_timeout fd = let t0 = Unix.gettimeofday () in for _ = 1 to n_iters do wait t (fun result bufs -> - if result < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno result, "readv", "")) - ) else ( - got := !got + result; - let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in - () - ) + got := !got + result; + let _job : _ Uring.job = Uring.readv t fd bufs ~file_offset:Optint.Int63.zero bufs |> Option.get in + () ) done; (* Get a snapshot of the stats before letting things finish. *) diff --git a/lib/uring/uring.ml b/lib/uring/uring.ml index ae5124dc..761d58a9 100644 --- a/lib/uring/uring.ml +++ b/lib/uring/uring.ml @@ -295,8 +295,6 @@ module Msghdr = struct create_with_addr ~n_fds ~fds:[] ?addr buffs end -type 'a job = 'a Heap.entry - type clock = Boottime | Realtime type probe @@ -316,38 +314,41 @@ module Uring = struct external opcode_supported : probe -> Op.t -> bool = "ocaml_uring_opcode_supported" [@@noalloc] type id = Heap.ptr + type job = private int type offset = Optint.Int63.t - external submit_nop : t -> id -> bool = "ocaml_uring_submit_nop" [@@noalloc] - external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> bool = "ocaml_uring_submit_timeout" [@@noalloc] - external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> bool = "ocaml_uring_submit_poll_add" [@@noalloc] - external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_read" [@@noalloc] - external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> bool = "ocaml_uring_submit_write" [@@noalloc] - external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_readv" [@@noalloc] - external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> bool = "ocaml_uring_submit_writev" [@@noalloc] - external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] - external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> bool = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] - external submit_close : t -> Unix.file_descr -> id -> bool = "ocaml_uring_submit_close" [@@noalloc] - external submit_statx : t -> id -> Unix.file_descr -> Statx.t -> Sketch.ptr -> int -> int -> bool = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] - external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_splice" [@@noalloc] - external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_bind" [@@noalloc] - external submit_listen : t -> id -> Unix.file_descr -> int -> bool = "ocaml_uring_submit_listen" [@@noalloc] - external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_connect" [@@noalloc] - external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> bool = "ocaml_uring_submit_accept" [@@noalloc] - external submit_cancel : t -> id -> id -> bool = "ocaml_uring_submit_cancel" [@@noalloc] - external submit_openat2 : t -> id -> Unix.file_descr -> Open_how.t -> bool = "ocaml_uring_submit_openat2" [@@noalloc] - external submit_linkat : t -> id -> Unix.file_descr -> Sketch.ptr -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] - external submit_unlinkat : t -> id -> Unix.file_descr -> Sketch.ptr -> bool -> bool = "ocaml_uring_submit_unlinkat" [@@noalloc] - external submit_mkdirat : t -> id -> Unix.file_descr -> Sketch.ptr -> int -> bool = "ocaml_uring_submit_mkdirat" [@@noalloc] - external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_send_msg" [@@noalloc] - external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> bool = "ocaml_uring_submit_recv_msg" [@@noalloc] - external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fsync" [@@noalloc] - external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> bool = "ocaml_uring_submit_fdatasync" [@@noalloc] + external submit_nop : t -> id -> job = "ocaml_uring_submit_nop" [@@noalloc] + external submit_timeout : t -> id -> Sketch.ptr -> clock -> bool -> job = "ocaml_uring_submit_timeout" [@@noalloc] + external submit_poll_add : t -> Unix.file_descr -> id -> Poll_mask.t -> job = "ocaml_uring_submit_poll_add" [@@noalloc] + external submit_read : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_read" [@@noalloc] + external submit_write : t -> Unix.file_descr -> id -> Cstruct.t -> offset -> job = "ocaml_uring_submit_write" [@@noalloc] + external submit_readv : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_readv" [@@noalloc] + external submit_writev : t -> Unix.file_descr -> id -> Sketch.ptr -> offset -> job = "ocaml_uring_submit_writev" [@@noalloc] + external submit_readv_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_readv_fixed_byte" "ocaml_uring_submit_readv_fixed_native" [@@noalloc] + external submit_writev_fixed : t -> Unix.file_descr -> id -> Cstruct.buffer -> int -> int -> offset -> job = "ocaml_uring_submit_writev_fixed_byte" "ocaml_uring_submit_writev_fixed_native" [@@noalloc] + external submit_close : t -> Unix.file_descr -> id -> job = "ocaml_uring_submit_close" [@@noalloc] + external submit_statx : t -> id -> Unix.file_descr option -> Statx.t -> Sketch.ptr -> int -> int -> job = "ocaml_uring_submit_statx_byte" "ocaml_uring_submit_statx_native" [@@noalloc] + external submit_splice : t -> id -> Unix.file_descr -> Unix.file_descr -> int -> job = "ocaml_uring_submit_splice" [@@noalloc] + external submit_bind : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_bind" [@@noalloc] + external submit_listen : t -> id -> Unix.file_descr -> int -> job = "ocaml_uring_submit_listen" [@@noalloc] + external submit_connect : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_connect" [@@noalloc] + external submit_accept : t -> id -> Unix.file_descr -> Sockaddr.t -> job = "ocaml_uring_submit_accept" [@@noalloc] + external submit_cancel : t -> id -> job -> job = "ocaml_uring_submit_cancel" [@@noalloc] + external submit_openat2 : t -> id -> Unix.file_descr option -> Open_how.t -> job = "ocaml_uring_submit_openat2" [@@noalloc] + external submit_linkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_linkat_byte" "ocaml_uring_submit_linkat_native" [@@noalloc] + external submit_unlinkat : t -> id -> Unix.file_descr option -> Sketch.ptr -> bool -> job = "ocaml_uring_submit_unlinkat" [@@noalloc] + external submit_mkdirat : t -> id -> Unix.file_descr option -> Sketch.ptr -> int -> job = "ocaml_uring_submit_mkdirat" [@@noalloc] + external submit_send_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_send_msg" [@@noalloc] + external submit_recv_msg : t -> id -> Unix.file_descr -> Msghdr.t -> Sketch.ptr -> job = "ocaml_uring_submit_recv_msg" [@@noalloc] + external submit_fsync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fsync" [@@noalloc] + external submit_fdatasync : t -> id -> Unix.file_descr -> int64 -> int -> job = "ocaml_uring_submit_fdatasync" [@@noalloc] type cqe_option = private | Cqe_none - | Cqe_some of { user_data_id : id; res: int } - [@@ocaml.warning "-37" (* Avoids "Unused constructor" warning on OCaml <= 4.09. *)] + | Cqe_unit of { user_data_id : id; result: unit } + | Cqe_int of { user_data_id : id; result: int } + | Cqe_fd of { user_data_id : id; result: Unix.file_descr } + | Cqe_error of { user_data_id : id; result: Unix.error } external wait_cqe : t -> cqe_option = "ocaml_uring_wait_cqe" external wait_cqe_timeout : float -> t -> cqe_option = "ocaml_uring_wait_cqe_timeout" @@ -357,6 +358,8 @@ module Uring = struct external register_eventfd : t -> Unix.file_descr -> unit = "ocaml_uring_register_eventfd" end +type 'a job = 'a Heap.entry * Uring.job + type 'a t = { id : < >; uring: Uring.t; @@ -439,19 +442,19 @@ let exit t = Uring.exit t.uring; unregister_gc_root t -let with_id_full : type a. a t -> (Heap.ptr -> bool) -> a -> extra_data:'b -> a job option = +let with_id_full : type a. a t -> (Heap.ptr -> Uring.job) -> a -> extra_data:'b -> a job option = fun t fn datum ~extra_data -> match Heap.alloc t.data datum ~extra_data with - | exception (Invalid_argument _ as ex) -> check t; raise ex - | entry -> - let ptr = Heap.ptr entry in - let has_space = fn ptr in - if has_space then - Some entry - else ( - ignore (Heap.free t.data ptr : a); - None - ) + | exception (Invalid_argument _ as ex) -> check t; raise ex + | entry -> + let ptr = Heap.ptr entry in + let job = fn ptr in + if (job :> int) >= 0 then + Some (entry, job) + else ( + ignore (Heap.free t.data ptr : a); + None + ) let with_id t fn a = with_id_full t fn a ~extra_data:() @@ -465,9 +468,7 @@ let timeout ?(absolute = false) t clock timeout_ns user_data = set_timespec timespec_ptr timeout_ns; with_id t (fun id -> Uring.submit_timeout t.uring id timespec_ptr clock absolute) user_data -let at_fdcwd : Unix.file_descr = Obj.magic Config.at_fdcwd - -let openat2 t ~access ~flags ~perm ~resolve ?(fd=at_fdcwd) path user_data = +let openat2 t ~access ~flags ~perm ~resolve ?fd path user_data = let open_flags = flags lor match access with | `R -> Open_flags.rdonly | `W -> Open_flags.wronly @@ -482,20 +483,20 @@ module Linkat_flags = struct let symlink_follow = Config.At.symlink_follow end -let linkat t ?(old_dir_fd=at_fdcwd) ?(new_dir_fd=at_fdcwd) ~flags ~old_path ~new_path user_data = +let linkat t ?old_dir_fd ?new_dir_fd ~flags ~old_path ~new_path user_data = with_id t (fun id -> let old_path_buf = Sketch.String.alloc t.sketch old_path in let new_path_buf = Sketch.String.alloc t.sketch new_path in Uring.submit_linkat t.uring id old_dir_fd old_path_buf new_dir_fd new_path_buf flags ) user_data -let unlink t ~dir ?(fd=at_fdcwd) path user_data = +let unlink t ~dir ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_unlinkat t.uring id fd buf dir ) user_data -let mkdirat t ~mode ?(fd=at_fdcwd) path user_data = +let mkdirat t ~mode ?fd path user_data = with_id t (fun id -> let buf = Sketch.String.alloc t.sketch path in Uring.submit_mkdirat t.uring id fd buf mode @@ -541,7 +542,7 @@ let poll_add t fd poll_mask user_data = let close t fd user_data = with_id t (fun id -> Uring.submit_close t.uring fd id) user_data -let statx t ?(fd=at_fdcwd) ~mask path statx flags user_data = +let statx t ?fd ~mask path statx flags user_data = let spath = Sketch.String.alloc t.sketch path in with_id_full t (fun id -> Uring.submit_statx t.uring id fd statx spath flags mask) user_data ~extra_data:statx @@ -584,9 +585,9 @@ let fsync t ?(off=0L) ?(len=0) fd user_data = let fdatasync t ?(off=0L) ?(len=0) fd user_data = with_id t (fun id -> Uring.submit_fdatasync t.uring id fd off len) user_data -let cancel t job user_data = - ignore (Heap.ptr job : Uring.id); (* Check it's still valid *) - with_id t (fun id -> Uring.submit_cancel t.uring id (Heap.ptr job)) user_data +let cancel t (entry, job) user_data = + ignore (Heap.ptr entry : Uring.id); (* Check it's still valid *) + with_id t (fun id -> Uring.submit_cancel t.uring id job) user_data let sqe_ready t = Uring.sq_ready t.uring @@ -613,14 +614,26 @@ let submit t = type 'a completion_option = | None - | Some of { result: int; data: 'a } + | Unit of { result: unit; data: 'a } + | Int of { result: int; data: 'a } + | FD of { result: Unix.file_descr; data: 'a } + | Error of { result: Unix.error; data: 'a } let fn_on_ring fn t = match fn t.uring with | Uring.Cqe_none -> None - | Uring.Cqe_some { user_data_id; res } -> + | Uring.Cqe_unit { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + Unit { result; data } + | Uring.Cqe_int { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + Int { result; data } + | Uring.Cqe_fd { user_data_id; result } -> + let data = Heap.free t.data user_data_id in + FD { result; data } + | Uring.Cqe_error { user_data_id; result } -> let data = Heap.free t.data user_data_id in - Some { result = res; data } + Error { result; data } let get_cqe_nonblocking t = check t; diff --git a/lib/uring/uring.mli b/lib/uring/uring.mli index 55d431bc..8a963f1a 100644 --- a/lib/uring/uring.mli +++ b/lib/uring/uring.mli @@ -842,11 +842,14 @@ val submit : 'a t -> int type 'a completion_option = | None - | Some of { result: int; data: 'a } (**) + | Unit of { result: unit; data: 'a } + | Int of { result: int; data: 'a } + | FD of { result: Unix.file_descr; data: 'a } + | Error of { result: Unix.error; data: 'a } (**) (** The type of results of calling {!wait} and {!peek}. [None] denotes that either there were no completions in the queue or an interrupt / timeout - occurred. [Some] contains both the user data attached to the completed - request and the integer syscall result. *) + occurred. The other constructors contain both the user data attached to the + completed request and syscall result. *) val wait : ?timeout:float -> 'a t -> 'a completion_option (** [wait ?timeout t] will block indefinitely (the default) or for [timeout] diff --git a/lib/uring/uring_stubs.c b/lib/uring/uring_stubs.c index 3ae351aa..baa7d7f9 100644 --- a/lib/uring/uring_stubs.c +++ b/lib/uring/uring_stubs.c @@ -145,14 +145,21 @@ value ocaml_uring_exit(value v_uring) { CAMLreturn(Val_unit); } +#define Data_val(v_id, kind) ((void *)(Long_val(v_id) | ((intnat)kind << (8 * sizeof(value) - 4)))) +#define DATA_MASK (~((intnat)((intnat)3 << (8 * sizeof(value) - 4)))) +#define RES_UNIT 0 +#define RES_INT 1 +#define RES_FD 2 + value /* noalloc */ ocaml_uring_submit_nop(value v_uring, value v_id) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_nop(sqe); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -193,10 +200,11 @@ ocaml_uring_submit_timeout(value v_uring, value v_id, value v_sketch_ptr, value flags |= IORING_TIMEOUT_ABS; sqe = io_uring_get_sqe(ring); - if (!sqe) return Val_false; + if (!sqe) return Val_long(-1); io_uring_prep_timeout(sqe, t, 0, flags); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return Val_true; + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return Val_long((intnat)id); } struct open_how_data { @@ -242,27 +250,37 @@ ocaml_uring_make_open_how(value v_flags, value v_mode, value v_resolve, value v_ CAMLreturn(v); } +static int with_at_fdcwd(value v_fd) +{ + if (Is_none(v_fd)) + return AT_FDCWD; + else + return Int_val(Some_val(v_fd)); +} + // Caller must ensure v_open_how is not GC'd until the job is finished. value /* noalloc */ ocaml_uring_submit_openat2(value v_uring, value v_id, value v_fd, value v_open_how) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); struct open_how_data *data = Open_how_val(v_open_how); - io_uring_prep_openat2(sqe, Int_val(v_fd), data->path, &data->how); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + io_uring_prep_openat2(sqe, with_at_fdcwd(v_fd), data->path, &data->how); + void *id = Data_val(v_id, RES_FD); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ ocaml_uring_submit_close(value v_uring, value v_fd, value v_id) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_close: fd:%d\n", Int_val(v_fd)); io_uring_prep_close(sqe, Int_val(v_fd)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -270,11 +288,12 @@ ocaml_uring_submit_poll_add(value v_uring, value v_fd, value v_id, value v_poll_ int poll_mask = Int_val(v_poll_mask); struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_poll_add: fd:%d mask:%x\n", Int_val(v_fd), poll_mask); io_uring_prep_poll_add(sqe, Int_val(v_fd), poll_mask); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -306,11 +325,12 @@ ocaml_uring_submit_readv(value v_uring, value v_fd, value v_id, value v_sketch_p size_t len = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(*iovs); if (sqe == NULL) - return (Val_false); + return (Val_long(-1)); dprintf("submit_readv: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int63_val(v_fileoff)); io_uring_prep_readv(sqe, Int_val(v_fd), iovs, len, Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // Caller must ensure the buffers pointed to by v_sketch_ptr are not GC'd until the job is finished. @@ -322,11 +342,12 @@ ocaml_uring_submit_writev(value v_uring, value v_fd, value v_id, value v_sketch_ size_t len = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(*iovs); if (sqe == NULL) - return (Val_false); + return (Val_long(-1)); dprintf("submit_writev: %d ents len[0] %lu off %d\n", len, iovs[0].iov_len, Int63_val(v_fileoff)); io_uring_prep_writev(sqe, Int_val(v_fd), iovs, len, Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // Caller must ensure the buffers are not released until this job completes. @@ -335,11 +356,12 @@ ocaml_uring_submit_readv_fixed_native(value v_uring, value v_fd, value v_id, val struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_readv_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); io_uring_prep_read_fixed(sqe, Int_val(v_fd), buf, Int_val(v_len), Int63_val(v_fileoff), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -361,11 +383,12 @@ ocaml_uring_submit_writev_fixed_native(value v_uring, value v_fd, value v_id, va struct io_uring_sqe *sqe = io_uring_get_sqe(ring); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); if (!sqe) - return (Val_false); + return (Val_long(-1)); dprintf("submit_writev_fixed: buf %p off %d len %d fileoff %d", buf, Int_val(v_off), Int_val(v_len), Int63_val(v_fileoff)); io_uring_prep_write_fixed(sqe, Int_val(v_fd), buf, Int_val(v_len), Int63_val(v_fileoff), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -388,12 +411,13 @@ ocaml_uring_submit_read(value v_uring, value v_fd, value v_id, value v_cstruct, value v_off = Field(v_cstruct, 1); value v_len = Field(v_cstruct, 2); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_read: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); io_uring_prep_read(sqe, Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -404,25 +428,27 @@ ocaml_uring_submit_write(value v_uring, value v_fd, value v_id, value v_cstruct, value v_off = Field(v_cstruct, 1); value v_len = Field(v_cstruct, 2); void *buf = Caml_ba_data_val(v_ba) + Long_val(v_off); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); dprintf("submit_write: fd %d buff %p len %zd fileoff %d\n", Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); io_uring_prep_write(sqe, Int_val(v_fd), buf, Long_val(v_len), Int63_val(v_fileoff)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ ocaml_uring_submit_splice(value v_uring, value v_id, value v_fd_in, value v_fd_out, value v_nbytes) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_splice(sqe, Int_val(v_fd_in), (int64_t) -1, Int_val(v_fd_out), (int64_t) -1, Int_val(v_nbytes), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } #define Statx_val(v) (*((struct statx **) Data_custom_val(v))) @@ -481,11 +507,12 @@ value ocaml_uring_submit_statx_native(value v_uring, value v_id, value v_fd, value v_statx, value v_sketch_ptr, value v_flags, value v_mask) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); char *path = Sketch_ptr_val(v_sketch_ptr); - io_uring_prep_statx(sqe, Int_val(v_fd), path, Int_val(v_flags), Int_val(v_mask), Statx_val(v_statx)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + io_uring_prep_statx(sqe, with_at_fdcwd(v_fd), path, Int_val(v_flags), Int_val(v_mask), Statx_val(v_statx)); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value @@ -783,10 +810,11 @@ ocaml_uring_submit_bind(value v_uring, value v_id, value v_fd, value v_sockaddr) struct io_uring_sqe *sqe; struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_bind(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), addr->sock_addr_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -794,10 +822,11 @@ ocaml_uring_submit_listen(value v_uring, value v_id, value v_fd, value v_backlog struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_listen(sqe, Int_val(v_fd), Int_val(v_backlog)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_sockaddr must not be GC'd while the call is in progress @@ -807,10 +836,11 @@ ocaml_uring_submit_connect(value v_uring, value v_id, value v_fd, value v_sockad struct io_uring_sqe *sqe; struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_connect(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), addr->sock_addr_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_msghdr must not be GC'd while the call is in progress @@ -819,13 +849,14 @@ ocaml_uring_submit_send_msg(value v_uring, value v_id, value v_fd, value v_msghd struct io_uring *ring = Ring_val(v_uring); struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (sqe == NULL) return (Val_false); + if (sqe == NULL) return (Val_long(-1)); msg->msg_iov = Sketch_ptr_val(v_sketch_ptr); msg->msg_iovlen = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(struct iovec); dprintf("submit_sendmsg\n"); io_uring_prep_sendmsg(sqe, Int_val(v_fd), msg, 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_msghdr must not be GC'd while the call is in progress @@ -834,13 +865,14 @@ ocaml_uring_submit_recv_msg(value v_uring, value v_id, value v_fd, value v_msghd struct io_uring *ring = Ring_val(v_uring); struct msghdr *msg = Msghdr_val(Field(v_msghdr, 0)); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (sqe == NULL) return (Val_false); + if (sqe == NULL) return (Val_long(-1)); msg->msg_iov = Sketch_ptr_val(v_sketch_ptr); msg->msg_iovlen = Sketch_ptr_len_val(v_sketch_ptr) / sizeof(struct iovec); dprintf("submit_recvmsg:msghdr %p: registering iobuf base %p len %lu\n", msg, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len); io_uring_prep_recvmsg(sqe, Int_val(v_fd), msg, 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_INT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } // v_sockaddr must not be GC'd while the call is in progress @@ -851,10 +883,11 @@ ocaml_uring_submit_accept(value v_uring, value v_id, value v_fd, value v_sockadd struct sock_addr_data *addr = Sock_addr_val(v_sockaddr); addr->sock_addr_len = sizeof(union sock_addr_union); sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_accept(sqe, Int_val(v_fd), &(addr->sock_addr_addr.s_gen), &addr->sock_addr_len, SOCK_CLOEXEC); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_FD); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -872,10 +905,11 @@ ocaml_uring_submit_unlinkat(value v_uring, value v_id, value v_fd, value v_sketc struct io_uring_sqe *sqe = io_uring_get_sqe(ring); int flags = Bool_val(v_rmdir) ? AT_REMOVEDIR : 0; char *path = Sketch_ptr_val(v_sketch_ptr); - if (!sqe) return (Val_false); - io_uring_prep_unlinkat(sqe, Int_val(v_fd), path, flags); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + if (!sqe) return (Val_long(-1)); + io_uring_prep_unlinkat(sqe, with_at_fdcwd(v_fd), path, flags); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -883,10 +917,11 @@ ocaml_uring_submit_mkdirat(value v_uring, value v_id, value v_fd, value v_sketch struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); char *path = Sketch_ptr_val(v_sketch_ptr); - if (!sqe) return (Val_false); - io_uring_prep_mkdirat(sqe, Int_val(v_fd), path, Int_val(v_mode)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + if (!sqe) return (Val_long(-1)); + io_uring_prep_mkdirat(sqe, with_at_fdcwd(v_fd), path, Int_val(v_mode)); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -894,12 +929,13 @@ ocaml_uring_submit_fsync(value v_uring, value v_id, value v_fd, value v_off, val { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_fsync(sqe, Int_val(v_fd), 0); sqe->off = Int64_val(v_off); sqe->len = Int_val(v_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -907,12 +943,13 @@ ocaml_uring_submit_fdatasync(value v_uring, value v_id, value v_fd, value v_off, { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_fsync(sqe, Int_val(v_fd), IORING_FSYNC_DATASYNC); sqe->off = Int64_val(v_off); sqe->len = Int_val(v_len); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value /* noalloc */ @@ -920,10 +957,12 @@ ocaml_uring_submit_cancel(value v_uring, value v_id, value v_target) { struct io_uring *ring = Ring_val(v_uring); struct io_uring_sqe *sqe; sqe = io_uring_get_sqe(ring); - if (!sqe) return (Val_false); + if (!sqe) return (Val_long(-1)); io_uring_prep_cancel(sqe, (void *)Long_val(v_target), 0); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); - return (Val_true); + /* RES_UNIT, since flags == 0 */ + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); + return (Val_long((intnat)id)); } value ocaml_uring_submit(value v_uring) @@ -938,21 +977,34 @@ value ocaml_uring_submit(value v_uring) #define Val_cqe_none Val_int(0) -static value Val_cqe_some(value id, value res) { - CAMLparam2(id, res); - CAMLlocal1(some); - some = caml_alloc(2, 0); - Store_field(some, 0, id); - Store_field(some, 1, res); +static value cqe_error(intnat id, int res) { + CAMLparam0(); + CAMLlocal1(err); + err = unix_error_of_code(-res); + value some = caml_alloc_small(2, 3); + Field(some, 0) = Val_long(id & DATA_MASK); + Field(some, 1) = err; CAMLreturn(some); } +static value Val_cqe_some(intnat id, int res) { + value some; + if (res < 0) { + some = cqe_error(id, res); + } else { + some = caml_alloc_small(2, (id >> (8 * sizeof(value) - 4))); + Field(some, 1) = Val_int(res); + } + Field(some, 0) = Val_long(id & DATA_MASK); + return some; +} + value ocaml_uring_wait_cqe_timeout(value v_timeout, value v_uring) { CAMLparam2(v_uring, v_timeout); double timeout = Double_val(v_timeout); struct __kernel_timespec t; - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -971,17 +1023,17 @@ value ocaml_uring_wait_cqe_timeout(value v_timeout, value v_uring) } else { if (!cqe) CAMLreturn(Val_cqe_none); - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } value ocaml_uring_wait_cqe(value v_uring) { CAMLparam1(v_uring); - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -996,17 +1048,17 @@ value ocaml_uring_wait_cqe(value v_uring) unix_error(-res, "io_uring_wait_cqe", Nothing); } } else { - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } value ocaml_uring_peek_cqe(value v_uring) { CAMLparam1(v_uring); - long id; + intnat id; struct io_uring *ring = Ring_val(v_uring); struct io_uring_cqe *cqe; int res; @@ -1019,10 +1071,10 @@ value ocaml_uring_peek_cqe(value v_uring) unix_error(-res, "io_uring_peek_cqe", Nothing); } } else { - id = (long)io_uring_cqe_get_data(cqe); + id = (intnat)io_uring_cqe_get_data(cqe); int cqe_res = cqe->res; io_uring_cqe_seen(ring, cqe); - CAMLreturn(Val_cqe_some(Val_int(id), Val_int(cqe_res))); + CAMLreturn(Val_cqe_some(id, cqe_res)); } } @@ -1095,12 +1147,13 @@ ocaml_uring_submit_linkat_native(value v_uring, value v_id, struct io_uring_sqe *sqe = io_uring_get_sqe(ring); if (!sqe) - return Val_false; + return Val_long(-1); - io_uring_prep_linkat(sqe, Int_val(v_old_dir), old_path, Int_val(v_new_dir), new_path, Int_val(v_flags)); - io_uring_sqe_set_data(sqe, (void *)Long_val(v_id)); + io_uring_prep_linkat(sqe, with_at_fdcwd(v_old_dir), old_path, with_at_fdcwd(v_new_dir), new_path, Int_val(v_flags)); + void *id = Data_val(v_id, RES_UNIT); + io_uring_sqe_set_data(sqe, id); - return Val_true; + return Val_long((intnat)id); } value diff --git a/tests/main.md b/tests/main.md index 51da6ac5..c165e019 100644 --- a/tests/main.md +++ b/tests/main.md @@ -18,9 +18,26 @@ end let rec consume t = match Uring.wait ~timeout:1. t with - | Some { data; result } -> (data, result) + | Int { data; result } -> (data, result) + | Unit { data; result = _ } -> (data, 0) + | FD _ | Error _ -> failwith "Unexpected return from syscall" | None -> consume t +let rec consume_result t = + match Uring.wait ~timeout:1. t with + | Error { data; result } -> (data, Error result) + | Unit { data; result = () } -> (data, Ok 0) + | Int { data; result } -> (data, Ok result) + | FD _ -> failwith "Unexpected return from syscall" + | None -> consume_result t + +let rec consume_fd path t = + match Uring.wait ~timeout:1. t with + | FD { data; result } -> data, result + | Error { data = _; result } -> raise (Unix.Unix_error (result, "openat2", path)) + | Int _ | Unit _ -> failwith "Unexpected return from syscall" + | None -> consume_fd path t + let traceln fmt = Format.printf (fmt ^^ "@.") ``` @@ -134,10 +151,7 @@ val t : [ `Open ] Uring.t = # Uring.submit t;;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd "/dev/null" t;; val token : [ `Open ] = `Open val fd : Unix.file_descr = @@ -170,10 +184,7 @@ val t : [ `Create ] Uring.t = # Uring.submit t;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd "test-openat" t;; val token : [ `Create ] = `Create val fd : Unix.file_descr = @@ -252,10 +263,7 @@ Now using `~fd`: # Uring.submit t;; - : int = 1 -# let token, fd = - let token, fd = consume t in - assert (fd >= 0); - token, (Obj.magic fd : Unix.file_descr);; +# let token, fd = consume_fd "test-openat" t;; val token : [ `Open_path | `Statx ] = `Open_path val fd : Unix.file_descr = @@ -301,14 +309,9 @@ val t : [ `Get_path ] Uring.t = path `Get_path)); traceln "Submitted %d" (Uring.submit t); - let `Get_path, fd = consume t in - if fd >= 0 then ( - let fd : Unix.file_descr = Obj.magic fd in - Unix.close fd; - traceln "Opened %S OK" path - ) else ( - raise (Unix.Unix_error (Uring.error_of_errno fd, "openat2", path)) - );; + let `Get_path, fd = consume_fd path t in + Unix.close fd; + traceln "Opened %S OK" path;; val get : resolve:Uring.Resolve.t -> string -> unit = # get ~resolve:Uring.Resolve.empty ".";; @@ -552,8 +555,8 @@ val fd : unit = () Ask to read from a pipe (with no data available), then cancel it. ```ocaml -# exception Multiple of Unix.error list;; -exception Multiple of Unix.error list +# exception Multiple of (int, Unix.error) result list;; +exception Multiple of (int, Unix.error) result list # let t : [ `Cancel | `Read ] Uring.t = Uring.create ~queue_depth:5 ();; val t : [ `Cancel | `Read ] Uring.t = @@ -571,19 +574,19 @@ val read : [ `Cancel | `Read ] Uring.job = - : [ `Cancel | `Read ] Uring.job option = Some # Uring.submit t;; - : int = 2 -# let t1, r1 = consume t in - let t2, r2 = consume t in +# let t1, r1 = consume_result t in + let t2, r2 = consume_result t in let r_read, r_cancel = match t1, t2 with | `Read, `Cancel -> r1, r2 | `Cancel, `Read -> r2, r1 | _ -> assert false in - begin match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EINTR, EALREADY + begin match r_read, r_cancel with + | Error EINTR, Error EALREADY (* Occasionally, the read is actually busy just as we try to cancel. In that case it gets interrupted and the cancel returns EALREADY. *) - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> + | Error (EUNKNOWNERR 125) (* ECANCELLED *), Ok 0 -> (* This is the common case. The read is blocked and can just be removed. *) () | e1, e2 -> raise (Multiple [e1; e2]) @@ -617,21 +620,22 @@ val read : [ `Cancel | `Read ] Uring.job = - : [ `Cancel | `Read ] Uring.job option = Some # Uring.submit t;; - : int = 1 -# let t1, r1 = consume t in - let t2, r2 = consume t in +# let t1, r1 = consume_result t in + let t2, r2 = consume_result t in let r_read, r_cancel = match t1, t2 with | `Read, `Cancel -> r1, r2 | `Cancel, `Read -> r2, r1 | _ -> assert false in - if r_read = 1 then ( - match Uring.error_of_errno r_cancel with - | ENOENT -> () - | e -> raise (Unix.Unix_error (e, "cancel", "")) + if r_read = Ok 1 then ( + match r_cancel with + | Ok _ + | Error ENOENT -> () + | Error e -> raise (Unix.Unix_error (e, "cancel", "")) ) else ( - match Uring.error_of_errno r_read, Uring.error_of_errno r_cancel with - | EUNKNOWNERR 125 (* ECANCELLED *), EUNKNOWNERR 0 -> + match r_read, r_cancel with + | Error (EUNKNOWNERR 125) (* ECANCELLED *), Ok 0 -> (* This isn't the case we want to test, but it can happen sometimes. *) () | e1, e2 -> raise (Multiple [e1; e2]) @@ -814,10 +818,10 @@ val check : unit -> bool * bool = - : unit Uring.job option = Some # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # check ();; - : bool * bool = (false, false) @@ -840,7 +844,7 @@ val t : unit Uring.t = # Uring.submit t;; - : int = 1 # Uring.wait t;; -- : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} +- : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # (Unix.lstat "new-symlink").st_kind;; - : Unix.file_kind = Unix.S_LNK ``` @@ -852,7 +856,7 @@ This currently doesn't work due to https://github.com/axboe/liburing/issues/955: # Uring.submit t;; - : int = 1 # Uring.wait t;; - - : unit Uring.completion_option = Uring.Some {Uring.result = 0; data = ()} + - : unit Uring.completion_option = Uring.Unit {Uring.result = (); data = ()} # (Unix.lstat "new-file").st_kind;; - : Unix.file_kind = Unix.S_REG @@ -876,7 +880,7 @@ val t : [ `Mkdir of int ] Uring.t = - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = 0; data = `Mkdir 0} +Uring.Unit {Uring.result = (); data = `Mkdir 0} # Printf.sprintf "0o%o" ((Unix.stat "mkdir").st_perm land 0o777);; - : string = "0o700" # let v = Uring.mkdirat t ~mode:0o755 "mkdir" (`Mkdir 1);; @@ -885,7 +889,7 @@ val v : [ `Mkdir of int ] Uring.job option = Some - : int = 1 # Uring.wait t;; - : [ `Mkdir of int ] Uring.completion_option = -Uring.Some {Uring.result = -17; data = `Mkdir 1} +Uring.Error {Uring.result = Unix.EEXIST; data = `Mkdir 1} # Uring.exit t;; - : unit = () ``` @@ -905,8 +909,8 @@ val t : [ `Timeout ] Uring.t = # Uring.submit t;; - : int = 1 -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # let ns = ((Unix.gettimeofday () +. 0.01) *. 1e9) @@ -915,15 +919,15 @@ val timeout : int = -62 Uring.(timeout ~absolute:true t Realtime ns `Timeout);; - : [ `Timeout ] Uring.job option = Some -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # let ns1 = Int64.(mul 10L 1_000_000L) in Uring.(timeout ~absolute:true t Boottime ns1 `Timeout);; - : [ `Timeout ] Uring.job option = Some -# let `Timeout, timeout = consume t;; -val timeout : int = -62 +# let `Timeout, timeout = consume_result t;; +val timeout : (int, Unix.error) result = Error (Unix.EUNKNOWNERR 62) # Uring.exit t;; - : unit = () diff --git a/tests/poll_add.ml b/tests/poll_add.ml index 5bceb1c3..b589c880 100644 --- a/tests/poll_add.ml +++ b/tests/poll_add.ml @@ -13,7 +13,8 @@ let () = let rec retry () = match Uring.wait t with | None -> retry () - | Some { result; _ } -> result + | Int { result; _ } -> result + | FD _ | Error _ | Unit _ -> failwith "Unexpected return from poll" in let res = retry () in Printf.eprintf "poll_add: %x\n%!" res; diff --git a/tests/sketch.md b/tests/sketch.md index 8fcc5a9f..36312653 100644 --- a/tests/sketch.md +++ b/tests/sketch.md @@ -11,7 +11,8 @@ let ldup n x = List.init n (Fun.const x) let rec consume t = match Uring.wait t with - | Some { data; result } -> (data, result) + | Int { data; result } -> (data, result) + | FD _ | Unit _ | Error _ -> failwith "Unexpected return from readv" | None -> consume t ``` diff --git a/tests/socket_ops.ml b/tests/socket_ops.ml index 36b5c2be..a9541aef 100644 --- a/tests/socket_ops.ml +++ b/tests/socket_ops.ml @@ -13,45 +13,43 @@ let () = let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, 0) in (* Use io_uring for bind operation *) - let bind_result = + let () = match Uring.bind t server_sock addr () with | None -> failwith "Failed to submit bind operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for bind" - | Uring.Some { result; data = _ } -> - if result < 0 then begin - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Bind failed: %s" (Unix.error_message err)) - end else - result + | Uring.Unit _ -> + print_endline "Bind completed successfully:" + | Uring.Error { result; data = _ } -> + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Bind failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from bind operation" in - printf "Bind completed with result: %d\n" bind_result; (* Use io_uring for listen operation *) let backlog = 10 in - let listen_result = + let () = match Uring.listen t server_sock backlog () with | None -> failwith "Failed to submit listen operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for listen" - | Uring.Some { result; data = _ } -> - if result < 0 then begin - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Listen failed: %s" (Unix.error_message err)) - end else - result + | Uring.Unit _ -> + print_endline "Listen completed successfully" + | Uring.Error { result; data = _ } -> + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Listen failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from listen operation" in - printf "Listen completed with result: %d\n" listen_result; (* Get the actual bound port - Unix.getsockname is necessary for socket introspection *) let actual_addr = Unix.getsockname server_sock in @@ -71,30 +69,28 @@ let () = (* Use io_uring for connect operation *) let connect_addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in - let connect_result = + let () = match Uring.connect t client_sock connect_addr () with | None -> failwith "Failed to submit connect operation" | Some _job -> let _submitted = Uring.submit t in match Uring.wait t with | Uring.None -> failwith "No completion for connect" - | Uring.Some { result; data = _ } -> + | Uring.Unit _ -> + print_endline "Connect initiated successfully\n" + | Uring.Error { result = Unix.EINPROGRESS; data = _ } -> (* Connect may return -EINPROGRESS for non-blocking sockets, which is normal *) - if result < 0 && result <> (-115) (* -EINPROGRESS *) then begin - Uring.close t client_sock () |> ignore; - Uring.close t server_sock () |> ignore; - Uring.submit t |> ignore; - Uring.exit t; - let err = Uring.error_of_errno (-result) in - failwith (sprintf "Connect failed: %s (errno: %d)" (Unix.error_message err) (-result)) - end else - result + print_endline "Connect initiated successfully (result: EINPROGRESS)" + | Uring.Error { result; data = _ } -> + Uring.close t client_sock () |> ignore; + Uring.close t server_sock () |> ignore; + Uring.submit t |> ignore; + Uring.exit t; + failwith (sprintf "Connect failed: %s" (Unix.error_message result)) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from connect operation" in - if connect_result = 0 || connect_result = (-115) then - printf "Connect initiated successfully (result: %d)\n" connect_result - else - printf "Connect completed with result: %d\n" connect_result; (* Get the client socket's local port - Unix.getsockname is necessary for socket introspection *) let client_addr = Unix.getsockname client_sock in @@ -122,12 +118,15 @@ let () = if pending > 0 then match Uring.wait t with | Uring.None -> failwith "No completion for close" - | Uring.Some { result; data = _ } -> - if result < 0 then - printf "Close warning: %s\n" (Unix.error_message (Uring.error_of_errno (-result))); + | Uring.Unit _ -> wait_closes (pending - 1) + | Uring.Error { result; data = _ } -> + printf "Close warning: %s\n" (Unix.error_message result); + wait_closes (pending - 1) + | Uring.Int _ | Uring.FD _ -> + failwith "Unexpected return from close operation" in wait_closes 2; Uring.exit t; - printf "Test completed successfully!\n" \ No newline at end of file + printf "Test completed successfully!\n" diff --git a/tests/urcat.ml b/tests/urcat.ml index b114f5ca..1f5e4328 100644 --- a/tests/urcat.ml +++ b/tests/urcat.ml @@ -11,7 +11,8 @@ let get_file_size fd = let get_completion_and_print uring = let iov, len = match Uring.wait uring with - | Some { data; result } -> (data, result) + | Int { data; result } -> (data, result) + | FD _ | Error _ | Unit _ -> failwith "Unexpected return from read" | None -> failwith "retry" in let remaining = ref len in diff --git a/tests/urcp_fixed_lib.ml b/tests/urcp_fixed_lib.ml index 83d93c6f..a5379504 100644 --- a/tests/urcp_fixed_lib.ml +++ b/tests/urcp_fixed_lib.ml @@ -49,10 +49,6 @@ let queue_read uring t len = t.read_left <- t.read_left - len; t.reads <- t.reads + 1 -(* TODO compile time check *) -let eagain = -11 -let eintr = -4 - (* Check that a read has completely finished, and if not * queue it up for completing the remaining amount *) let handle_read_completion uring req res = @@ -61,13 +57,6 @@ let handle_read_completion uring req res = match res with | 0 -> Logs.debug (fun l -> l "eof %a" pp_req req); - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.read_fixed ~file_offset:req.fileoff uring req.t.infd ~off:req.fixed_off ~len:req.len req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> - raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read *) req.off <- req.off + n; @@ -90,12 +79,6 @@ let handle_write_completion uring req res = let bytes_to_write = req.len - req.off in match res with | 0 -> raise End_of_file - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.write_fixed ~file_offset:req.fileoff uring req.t.outfd ~off:req.fixed_off ~len:req.len req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> failwith (Fmt.str "unix error %d" (-n)) | n when n < bytes_to_write -> (* handle short write *) req.off <- req.off + n; @@ -115,6 +98,19 @@ let handle_completion uring req res = |`R -> handle_read_completion uring req res |`W -> handle_write_completion uring req res +let handle_retry uring req = + match req.op with + |`R -> + (* requeue the request *) + let r = Uring.read_fixed ~file_offset:req.fileoff uring req.t.infd ~off:req.fixed_off ~len:req.len req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + |`W -> + (* requeue the request *) + let r = Uring.write_fixed ~file_offset:req.fileoff uring req.t.outfd ~off:req.fixed_off ~len:req.len req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + let copy_file uring t = (* Create a set of read requests that we will turn into write requests * up until the queue depth *) @@ -137,11 +133,16 @@ let copy_file uring t = if t.write_left > 0 then begin let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with - |None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - |Some { data; result } -> + | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") + | Int { data; result } -> handle_completion uring data result; got_completion := true; - handle_completions (); + handle_completions () + | Error { data; result = (Unix.EAGAIN | Unix.EINTR) } -> + handle_retry uring data + | Error { data = _; result } -> + failwith ("Unexpected error: " ^ Unix.error_message result) + | FD _ | Unit _ -> failwith "Unexpected return from syscall" end in handle_completions (); diff --git a/tests/urcp_lib.ml b/tests/urcp_lib.ml index b1d194fe..2b5f2118 100644 --- a/tests/urcp_lib.ml +++ b/tests/urcp_lib.ml @@ -54,10 +54,6 @@ let queue_read uring t len = t.read_left <- t.read_left - len; t.reads <- t.reads + 1 -(* TODO compile time check *) -let eagain = -11 -let eintr = -4 - (* Check that a read has completely finished, and if not * queue it up for completing the remaining amount *) let handle_read_completion uring req res = @@ -66,13 +62,6 @@ let handle_read_completion uring req res = match res with | 0 -> Logs.debug (fun l -> l "eof %a" pp_req req); - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.readv ~file_offset:req.fileoff uring req.t.infd req.iov.next req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); - | n when n < 0 -> - raise (Failure ("unix errorno " ^ (string_of_int n))) | n when n < bytes_to_read -> (* handle short read so new iovec and resubmit *) req.iov.next <- Cstruct.shiftv req.iov.next n; @@ -97,11 +86,6 @@ let handle_write_completion uring req res = let bytes_to_write = req.len - req.off in match res with | 0 -> raise End_of_file - | n when n = eagain || n = eintr -> - (* requeue the request *) - let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in - assert(r <> None); - Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req); | n when n < bytes_to_write -> (* handle short write so new iovec and resubmit *) req.iov.next <- Cstruct.shiftv req.iov.next n; @@ -120,6 +104,19 @@ let handle_completion uring req res = |`R -> handle_read_completion uring req res |`W -> handle_write_completion uring req res +let handle_retry uring req = + match req.op with + |`R -> + (* requeue the request *) + let r = Uring.writev ~file_offset:req.fileoff uring req.t.infd req.iov.next req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + |`W -> + (* requeue the request *) + let r = Uring.readv ~file_offset:req.fileoff uring req.t.infd req.iov.next req in + assert(r <> None); + Logs.debug (fun l -> l "requeued eintr read: %a" pp_req req) + let copy_file uring t = (* Create a set of read requests that we will turn into write requests * up until the queue depth *) @@ -142,11 +139,16 @@ let copy_file uring t = if t.write_left > 0 then begin let check_q = if !got_completion then Uring.get_cqe_nonblocking uring else Uring.wait uring in match check_q with - |None -> Logs.debug (fun l -> l "completions: retry so finishing loop") - |Some { data; result } -> + | None -> Logs.debug (fun l -> l "completions: retry so finishing loop") + | Int { data; result } -> handle_completion uring data result; got_completion := true; - handle_completions (); + handle_completions () + | Error { data; result = (Unix.EAGAIN | Unix.EINTR) } -> + handle_retry uring data + | Error { data = _; result } -> + failwith ("Unexpected error: " ^ Unix.error_message result) + | FD _ | Unit _ -> failwith "Unexpected return from syscall" end in handle_completions (); diff --git a/tests/urstat.ml b/tests/urstat.ml index 702c29ec..bff67fff 100644 --- a/tests/urstat.ml +++ b/tests/urstat.ml @@ -13,7 +13,8 @@ let pp_time f (sec, nsec) = let get_completion_and_print uring = let (fname, buf), _ = match Uring.wait uring with - | Some { data; result } -> (data, result) + | Unit { data; result } -> (data, result) + | Int _ | FD _ | Error _ -> failwith "Unexpected return from statx" | None -> failwith "retry" in let kind = S.kind buf in