Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ext/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
have_func("&rb_fiber_transfer")

if have_library("uring") and have_header("liburing.h")
# We might want to consider using this in the future:
# have_func("io_uring_submit_and_wait_timeout", "liburing.h")

have_func("io_uring_prep_waitid", "liburing.h")
$srcs << "io/event/selector/uring.c"
end

Expand All @@ -51,6 +49,7 @@
$srcs << "io/event/interrupt.c"

have_func("rb_io_descriptor")
have_func("&rb_process_status_new")
have_func("&rb_process_status_wait")
have_func("rb_fiber_current")
have_func("&rb_fiber_raise")
Expand Down
48 changes: 46 additions & 2 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,33 @@ struct io_uring_sqe * io_get_sqe(struct IO_Event_Selector_URing *selector) {

#pragma mark - Process.wait

#if defined(HAVE_IO_URING_PREP_WAITID) && defined(HAVE__RB_PROCESS_STATUS_NEW)
#define USE_WAITID

inline idtype_t waitid_type(int pid) {
switch (pid) {
case -1:
return P_ALL;
case 0:
return P_PGID;
default:
return P_PID;
}
}
#endif

struct process_wait_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;

pid_t pid;
int flags;

#ifdef USE_WAITID
siginfo_t infop;
#else
int descriptor;
#endif
};

static
Expand All @@ -439,18 +459,29 @@ VALUE process_wait_transfer(VALUE _arguments) {

IO_Event_Selector_loop_yield(&arguments->selector->backend);

#ifdef USE_WAITID
if (DEBUG) printf("waitid result: %d pid: %d status: %d\n",
arguments->waiting->result,
arguments->infop.si_pid,
arguments->infop.si_status
);
return rb_process_status_new(arguments->infop.si_pid, arguments->infop.si_status, 0);
#else
if (arguments->waiting->result) {
return IO_Event_Selector_process_status_wait(arguments->pid, arguments->flags);
} else {
return Qfalse;
}
#endif
}

static
VALUE process_wait_ensure(VALUE _arguments) {
struct process_wait_arguments *arguments = (struct process_wait_arguments *)_arguments;

#ifndef USE_WAITID
close(arguments->descriptor);
#endif

IO_Event_Selector_URing_Waiting_cancel(arguments->waiting);

Expand All @@ -464,11 +495,13 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid,
pid_t pid = NUM2PIDT(_pid);
int flags = NUM2INT(_flags);

#ifndef USE_WAITID
int descriptor = pidfd_open(pid, 0);
if (descriptor < 0) {
rb_syserr_fail(errno, "IO_Event_Selector_URing_process_wait:pidfd_open");
}
rb_update_max_fd(descriptor);
#endif

struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -483,12 +516,23 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid,
.waiting = &waiting,
.pid = pid,
.flags = flags,
.descriptor = descriptor,
#ifdef USE_WAITID
.infop = { .si_pid = 0, .si_status = 0 }
#else
.descriptor = descriptor
#endif
};

if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber);
struct io_uring_sqe *sqe = io_get_sqe(selector);

#ifdef USE_WAITID
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_waitid(%p)\n", (void*)fiber);
if (DEBUG) printf("waitid pid: %d flags: %d\n", pid, flags);
io_uring_prep_waitid(sqe, waitid_type(pid), pid, &process_wait_arguments.infop, WEXITED, 0);
#else
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber);
io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR);
#endif
io_uring_sqe_set_data(sqe, completion);
io_uring_submit_pending(selector);

Expand Down
28 changes: 28 additions & 0 deletions test/io/event/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,34 @@ def transfer
expect(result1).to be(:success?)
expect(result2).to be(:success?)
end

it "can wait for any child process" do
skip("This test works only on URing selector") if !selector.is_a?(IO::Event::Selector::URing)

result1 = result2 = nil
pids = []

fiber = Fiber.new do
pid1 = Process.spawn("sleep 0")
pid2 = Process.spawn("sleep 0")
pids << pid1 << pid2

result1 = selector.process_wait(Fiber.current, -1, 0)
result2 = selector.process_wait(Fiber.current, -1, 0)
end

fiber.transfer

while fiber.alive?
selector.select(0)
end

expect(result1).to be(:success?)
expect(result2).to be(:success?)

result_pids = [result1, result2].map(&:pid).sort
expect(result_pids).to be == pids.sort
end
end

with "#resume" do
Expand Down
27 changes: 27 additions & 0 deletions test/io/event/selector/process_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@

expect(result.success?).to be == true
end

it "can wait for a process which has terminated already" do
result = nil

fiber = Fiber.new do
input, output = IO.pipe

# For some reason, sleep 0.1 here is very unreliable...?
pid = Process.spawn("true", out: output)
output.close

# Internally, this should generate POLLHUP, which is what we want to test:
expect(selector.io_wait(Fiber.current, input, IO::READABLE)).to be == IO::READABLE
input.close

_, result = Process.wait2(pid)
end

fiber.transfer

# Wait until the result is collected:
until result
selector.select(1)
end

expect(result.success?).to be == true
end
end

IO::Event::Selector.constants.each do |name|
Expand Down
Loading