diff --git a/ext/extconf.rb b/ext/extconf.rb index 9bd4e0b..5b49c53 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -31,9 +31,7 @@ have_func("&rb_fiber_transfer") if have_library("uring") and have_header("liburing.h") - # We might want to consider using this in the future: - # have_func("io_uring_submit_and_wait_timeout", "liburing.h") - + have_func("io_uring_prep_waitid", "liburing.h") $srcs << "io/event/selector/uring.c" end @@ -51,6 +49,7 @@ $srcs << "io/event/interrupt.c" have_func("rb_io_descriptor") +have_func("&rb_process_status_new") have_func("&rb_process_status_wait") have_func("rb_fiber_current") have_func("&rb_fiber_raise") diff --git a/ext/io/event/selector/uring.c b/ext/io/event/selector/uring.c index 4d2cba6..1c646e4 100644 --- a/ext/io/event/selector/uring.c +++ b/ext/io/event/selector/uring.c @@ -424,13 +424,33 @@ struct io_uring_sqe * io_get_sqe(struct IO_Event_Selector_URing *selector) { #pragma mark - Process.wait +#if defined(HAVE_IO_URING_PREP_WAITID) && defined(HAVE__RB_PROCESS_STATUS_NEW) +#define USE_WAITID + +inline idtype_t waitid_type(int pid) { + switch (pid) { + case -1: + return P_ALL; + case 0: + return P_PGID; + default: + return P_PID; + } +} +#endif + struct process_wait_arguments { struct IO_Event_Selector_URing *selector; struct IO_Event_Selector_URing_Waiting *waiting; pid_t pid; int flags; + + #ifdef USE_WAITID + siginfo_t infop; + #else int descriptor; + #endif }; static @@ -439,18 +459,29 @@ VALUE process_wait_transfer(VALUE _arguments) { IO_Event_Selector_loop_yield(&arguments->selector->backend); + #ifdef USE_WAITID + if (DEBUG) printf("waitid result: %d pid: %d status: %d\n", + arguments->waiting->result, + arguments->infop.si_pid, + arguments->infop.si_status + ); + return rb_process_status_new(arguments->infop.si_pid, arguments->infop.si_status, 0); + #else if (arguments->waiting->result) { return IO_Event_Selector_process_status_wait(arguments->pid, arguments->flags); } else { return Qfalse; } + #endif } static VALUE process_wait_ensure(VALUE _arguments) { struct process_wait_arguments *arguments = (struct process_wait_arguments *)_arguments; + #ifndef USE_WAITID close(arguments->descriptor); + #endif IO_Event_Selector_URing_Waiting_cancel(arguments->waiting); @@ -464,11 +495,13 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid, pid_t pid = NUM2PIDT(_pid); int flags = NUM2INT(_flags); + #ifndef USE_WAITID int descriptor = pidfd_open(pid, 0); if (descriptor < 0) { rb_syserr_fail(errno, "IO_Event_Selector_URing_process_wait:pidfd_open"); } rb_update_max_fd(descriptor); + #endif struct IO_Event_Selector_URing_Waiting waiting = { .fiber = fiber, @@ -483,12 +516,23 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid, .waiting = &waiting, .pid = pid, .flags = flags, - .descriptor = descriptor, + #ifdef USE_WAITID + .infop = { .si_pid = 0, .si_status = 0 } + #else + .descriptor = descriptor + #endif }; - if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber); struct io_uring_sqe *sqe = io_get_sqe(selector); + + #ifdef USE_WAITID + if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_waitid(%p)\n", (void*)fiber); + if (DEBUG) printf("waitid pid: %d flags: %d\n", pid, flags); + io_uring_prep_waitid(sqe, waitid_type(pid), pid, &process_wait_arguments.infop, WEXITED, 0); + #else + if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber); io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR); + #endif io_uring_sqe_set_data(sqe, completion); io_uring_submit_pending(selector); diff --git a/test/io/event/selector.rb b/test/io/event/selector.rb index 64aa3b9..e27fb6a 100644 --- a/test/io/event/selector.rb +++ b/test/io/event/selector.rb @@ -645,6 +645,34 @@ def transfer expect(result1).to be(:success?) expect(result2).to be(:success?) end + + it "can wait for any child process" do + skip("This test works only on URing selector") if !selector.is_a?(IO::Event::Selector::URing) + + result1 = result2 = nil + pids = [] + + fiber = Fiber.new do + pid1 = Process.spawn("sleep 0") + pid2 = Process.spawn("sleep 0") + pids << pid1 << pid2 + + result1 = selector.process_wait(Fiber.current, -1, 0) + result2 = selector.process_wait(Fiber.current, -1, 0) + end + + fiber.transfer + + while fiber.alive? + selector.select(0) + end + + expect(result1).to be(:success?) + expect(result2).to be(:success?) + + result_pids = [result1, result2].map(&:pid).sort + expect(result_pids).to be == pids.sort + end end with "#resume" do diff --git a/test/io/event/selector/process_io.rb b/test/io/event/selector/process_io.rb index 44f7786..29c7fe8 100644 --- a/test/io/event/selector/process_io.rb +++ b/test/io/event/selector/process_io.rb @@ -38,6 +38,33 @@ expect(result.success?).to be == true end + + it "can wait for a process which has terminated already" do + result = nil + + fiber = Fiber.new do + input, output = IO.pipe + + # For some reason, sleep 0.1 here is very unreliable...? + pid = Process.spawn("true", out: output) + output.close + + # Internally, this should generate POLLHUP, which is what we want to test: + expect(selector.io_wait(Fiber.current, input, IO::READABLE)).to be == IO::READABLE + input.close + + _, result = Process.wait2(pid) + end + + fiber.transfer + + # Wait until the result is collected: + until result + selector.select(1) + end + + expect(result.success?).to be == true + end end IO::Event::Selector.constants.each do |name|