Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

have_func("rb_ext_ractor_safe")
have_func("&rb_fiber_transfer")
have_func("rb_io_interruptible_operation")

if have_library("uring") and have_header("liburing.h")
# We might want to consider using this in the future:
Expand Down
6 changes: 6 additions & 0 deletions ext/io/event/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ void Init_IO_Event(void)
#ifdef IO_EVENT_SELECTOR_KQUEUE
Init_IO_Event_Selector_KQueue(IO_Event_Selector);
#endif

#ifdef HAVE_RB_IO_INTERRUPTABLE_OPERATION
rb_define_const(IO_Event, "INTERRUPTABLE", Qtrue);
#else
rb_define_const(IO_Event, "INTERRUPTABLE", Qfalse);
#endif
}
7 changes: 5 additions & 2 deletions ext/io/event/selector/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ VALUE IO_Event_Selector_EPoll_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_EPoll_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -523,6 +523,8 @@ VALUE IO_Event_Selector_EPoll_process_wait(VALUE self, VALUE fiber, VALUE _pid,
struct io_wait_arguments {
struct IO_Event_Selector_EPoll *selector;
struct IO_Event_Selector_EPoll_Waiting *waiting;

VALUE io;
};

static
Expand All @@ -538,7 +540,7 @@ static
VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;

IO_Event_Selector_loop_yield(&arguments->selector->backend);
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);

if (arguments->waiting->ready) {
return RB_INT2NUM(arguments->waiting->ready);
Expand Down Expand Up @@ -578,6 +580,7 @@ VALUE IO_Event_Selector_EPoll_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
};

return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
Expand Down
10 changes: 7 additions & 3 deletions ext/io/event/selector/kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ enum {
DEBUG = 0,
DEBUG_IO_READ = 0,
DEBUG_IO_WRITE = 0,
DEBUG_IO_WAIT = 0
DEBUG_IO_WAIT = 0,
DEBUG_IO_INTERRUPT = 1
};

#ifndef EVFILT_USER
Expand Down Expand Up @@ -407,7 +408,7 @@ VALUE IO_Event_Selector_KQueue_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_KQueue_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -516,6 +517,8 @@ VALUE IO_Event_Selector_KQueue_process_wait(VALUE self, VALUE fiber, VALUE _pid,
struct io_wait_arguments {
struct IO_Event_Selector_KQueue *selector;
struct IO_Event_Selector_KQueue_Waiting *waiting;

VALUE io;
};

static
Expand All @@ -531,7 +534,7 @@ static
VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;

IO_Event_Selector_loop_yield(&arguments->selector->backend);
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);

if (arguments->waiting->ready) {
return RB_INT2NUM(arguments->waiting->ready);
Expand Down Expand Up @@ -564,6 +567,7 @@ VALUE IO_Event_Selector_KQueue_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
};

if (DEBUG_IO_WAIT) fprintf(stderr, "IO_Event_Selector_KQueue_io_wait descriptor=%d\n", descriptor);
Expand Down
15 changes: 15 additions & 0 deletions ext/io/event/selector/selector.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend)
return IO_Event_Fiber_transfer(backend->loop, 0, NULL);
}

#ifndef HAVE_RB_IO_INTERRUPTABLE_OPERATION
static inline VALUE
rb_io_interruptible_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument) {
return function(argument);
}
#endif

static VALUE IO_Event_Selector_loop_yield_io_interruptible(VALUE fiber) {
return IO_Event_Fiber_transfer(fiber, 0, NULL);
}

VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io) {
return rb_io_interruptible_operation(io, IO_Event_Selector_loop_yield_io_interruptible, backend->loop);
}

struct wait_and_transfer_arguments {
int argc;
VALUE *argv;
Expand Down
3 changes: 3 additions & 0 deletions ext/io/event/selector/selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ VALUE IO_Event_Selector_loop_resume(struct IO_Event_Selector *backend, VALUE fib
// Strictly speaking, it's not a scheduling operation (does not schedule the current fiber).
VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend);

// Similar to `IO_Event_Selector_loop_yield` but allows the caller to specify an IO which may be interrupted.
VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io);

// Resume a specific fiber. This is a scheduling operation.
// The first argument is the fiber, the rest are the arguments to the resume.
//
Expand Down
28 changes: 17 additions & 11 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ VALUE IO_Event_Selector_URing_push(VALUE self, VALUE fiber)

IO_Event_Selector_ready_push(&selector->backend, fiber);

return Qnil;
return fiber;
}

VALUE IO_Event_Selector_URing_raise(int argc, VALUE *argv, VALUE self)
Expand Down Expand Up @@ -522,6 +522,7 @@ int events_from_poll_flags(short flags) {
struct io_wait_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
short flags;
};

Expand All @@ -548,7 +549,7 @@ VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
struct IO_Event_Selector_URing *selector = arguments->selector;

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);

Expand Down Expand Up @@ -588,7 +589,8 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
struct io_wait_arguments io_wait_arguments = {
.selector = selector,
.waiting = &waiting,
.flags = flags
.io = io,
.flags = flags,
};

return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
Expand Down Expand Up @@ -619,6 +621,7 @@ static inline off_t io_seekable(int descriptor)
struct io_read_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
int descriptor;
off_t offset;
char *buffer;
Expand All @@ -638,7 +641,7 @@ io_read_submit(VALUE _arguments)
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_now(selector);

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

return RB_INT2NUM(arguments->waiting->result);
}
Expand All @@ -664,7 +667,7 @@ io_read_ensure(VALUE _arguments)
}

static int
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -677,6 +680,7 @@ io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, c
struct io_read_arguments io_read_arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
Expand Down Expand Up @@ -705,7 +709,7 @@ VALUE IO_Event_Selector_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE b

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -756,7 +760,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand All @@ -783,6 +787,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE
struct io_write_arguments {
struct IO_Event_Selector_URing *selector;
struct IO_Event_Selector_URing_Waiting *waiting;
VALUE io;
int descriptor;
off_t offset;
char *buffer;
Expand All @@ -802,7 +807,7 @@ io_write_submit(VALUE _argument)
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
io_uring_submit_pending(selector);

IO_Event_Selector_loop_yield(&selector->backend);
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);

return RB_INT2NUM(arguments->waiting->result);
}
Expand All @@ -828,7 +833,7 @@ io_write_ensure(VALUE _argument)
}

static int
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
{
struct IO_Event_Selector_URing_Waiting waiting = {
.fiber = fiber,
Expand All @@ -841,6 +846,7 @@ io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor,
struct io_write_arguments arguments = {
.selector = selector,
.waiting = &waiting,
.io = io,
.descriptor = descriptor,
.offset = offset,
.buffer = buffer,
Expand Down Expand Up @@ -873,7 +879,7 @@ VALUE IO_Event_Selector_URing_io_write(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down Expand Up @@ -928,7 +934,7 @@ VALUE IO_Event_Selector_URing_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE

size_t maximum_size = size - offset;
while (maximum_size) {
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);

if (result > 0) {
total += result;
Expand Down
Loading
Loading