Skip to content

Commit 722783e

Browse files
committed
Allow interruption of IO waits.
1 parent 0aa989c commit 722783e

File tree

11 files changed

+586
-27
lines changed

11 files changed

+586
-27
lines changed

ext/extconf.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
have_func("rb_ext_ractor_safe")
3030
have_func("&rb_fiber_transfer")
31+
have_func("rb_io_interruptable_operation")
3132

3233
if have_library("uring") and have_header("liburing.h")
3334
# We might want to consider using this in the future:

ext/io/event/event.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,10 @@ void Init_IO_Event(void)
2929
#ifdef IO_EVENT_SELECTOR_KQUEUE
3030
Init_IO_Event_Selector_KQueue(IO_Event_Selector);
3131
#endif
32+
33+
#ifdef HAVE_RB_IO_INTERRUPTABLE_OPERATION
34+
rb_define_const(IO_Event, "INTERRUPTABLE", Qtrue);
35+
#else
36+
rb_define_const(IO_Event, "INTERRUPTABLE", Qfalse);
37+
#endif
3238
}

ext/io/event/selector/epoll.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ VALUE IO_Event_Selector_EPoll_push(VALUE self, VALUE fiber)
420420

421421
IO_Event_Selector_ready_push(&selector->backend, fiber);
422422

423-
return Qnil;
423+
return fiber;
424424
}
425425

426426
VALUE IO_Event_Selector_EPoll_raise(int argc, VALUE *argv, VALUE self)
@@ -523,6 +523,8 @@ VALUE IO_Event_Selector_EPoll_process_wait(VALUE self, VALUE fiber, VALUE _pid,
523523
struct io_wait_arguments {
524524
struct IO_Event_Selector_EPoll *selector;
525525
struct IO_Event_Selector_EPoll_Waiting *waiting;
526+
527+
VALUE io;
526528
};
527529

528530
static
@@ -538,7 +540,7 @@ static
538540
VALUE io_wait_transfer(VALUE _arguments) {
539541
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
540542

541-
IO_Event_Selector_loop_yield(&arguments->selector->backend);
543+
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);
542544

543545
if (arguments->waiting->ready) {
544546
return RB_INT2NUM(arguments->waiting->ready);
@@ -578,6 +580,7 @@ VALUE IO_Event_Selector_EPoll_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
578580
struct io_wait_arguments io_wait_arguments = {
579581
.selector = selector,
580582
.waiting = &waiting,
583+
.io = io,
581584
};
582585

583586
return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);

ext/io/event/selector/kqueue.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ enum {
1919
DEBUG = 0,
2020
DEBUG_IO_READ = 0,
2121
DEBUG_IO_WRITE = 0,
22-
DEBUG_IO_WAIT = 0
22+
DEBUG_IO_WAIT = 0,
23+
DEBUG_IO_INTERRUPT = 1
2324
};
2425

2526
#ifndef EVFILT_USER
@@ -407,7 +408,7 @@ VALUE IO_Event_Selector_KQueue_push(VALUE self, VALUE fiber)
407408

408409
IO_Event_Selector_ready_push(&selector->backend, fiber);
409410

410-
return Qnil;
411+
return fiber;
411412
}
412413

413414
VALUE IO_Event_Selector_KQueue_raise(int argc, VALUE *argv, VALUE self)
@@ -516,6 +517,8 @@ VALUE IO_Event_Selector_KQueue_process_wait(VALUE self, VALUE fiber, VALUE _pid,
516517
struct io_wait_arguments {
517518
struct IO_Event_Selector_KQueue *selector;
518519
struct IO_Event_Selector_KQueue_Waiting *waiting;
520+
521+
VALUE io;
519522
};
520523

521524
static
@@ -531,7 +534,7 @@ static
531534
VALUE io_wait_transfer(VALUE _arguments) {
532535
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
533536

534-
IO_Event_Selector_loop_yield(&arguments->selector->backend);
537+
IO_Event_Selector_loop_yield_io(&arguments->selector->backend, arguments->io);
535538

536539
if (arguments->waiting->ready) {
537540
return RB_INT2NUM(arguments->waiting->ready);
@@ -564,6 +567,7 @@ VALUE IO_Event_Selector_KQueue_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE
564567
struct io_wait_arguments io_wait_arguments = {
565568
.selector = selector,
566569
.waiting = &waiting,
570+
.io = io,
567571
};
568572

569573
if (DEBUG_IO_WAIT) fprintf(stderr, "IO_Event_Selector_KQueue_io_wait descriptor=%d\n", descriptor);

ext/io/event/selector/selector.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,21 @@ VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend)
116116
return IO_Event_Fiber_transfer(backend->loop, 0, NULL);
117117
}
118118

119+
#ifndef HAVE_RB_IO_INTERRUPTABLE_OPERATION
120+
static inline VALUE
121+
rb_io_interruptable_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument) {
122+
return function(argument);
123+
}
124+
#endif
125+
126+
static VALUE IO_Event_Selector_loop_yield_io_interruptable(VALUE fiber) {
127+
return IO_Event_Fiber_transfer(fiber, 0, NULL);
128+
}
129+
130+
VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io) {
131+
return rb_io_interruptable_operation(io, IO_Event_Selector_loop_yield_io_interruptable, backend->loop);
132+
}
133+
119134
struct wait_and_transfer_arguments {
120135
int argc;
121136
VALUE *argv;

ext/io/event/selector/selector.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ VALUE IO_Event_Selector_loop_resume(struct IO_Event_Selector *backend, VALUE fib
120120
// Strictly speaking, it's not a scheduling operation (does not schedule the current fiber).
121121
VALUE IO_Event_Selector_loop_yield(struct IO_Event_Selector *backend);
122122

123+
// Similar to `IO_Event_Selector_loop_yield` but allows the caller to specify an IO which may be interrupted.
124+
VALUE IO_Event_Selector_loop_yield_io(struct IO_Event_Selector *backend, VALUE io);
125+
123126
// Resume a specific fiber. This is a scheduling operation.
124127
// The first argument is the fiber, the rest are the arguments to the resume.
125128
//

ext/io/event/selector/uring.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ VALUE IO_Event_Selector_URing_push(VALUE self, VALUE fiber)
303303

304304
IO_Event_Selector_ready_push(&selector->backend, fiber);
305305

306-
return Qnil;
306+
return fiber;
307307
}
308308

309309
VALUE IO_Event_Selector_URing_raise(int argc, VALUE *argv, VALUE self)
@@ -522,6 +522,7 @@ int events_from_poll_flags(short flags) {
522522
struct io_wait_arguments {
523523
struct IO_Event_Selector_URing *selector;
524524
struct IO_Event_Selector_URing_Waiting *waiting;
525+
VALUE io;
525526
short flags;
526527
};
527528

@@ -548,7 +549,7 @@ VALUE io_wait_transfer(VALUE _arguments) {
548549
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
549550
struct IO_Event_Selector_URing *selector = arguments->selector;
550551

551-
IO_Event_Selector_loop_yield(&selector->backend);
552+
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);
552553

553554
if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);
554555

@@ -588,7 +589,8 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
588589
struct io_wait_arguments io_wait_arguments = {
589590
.selector = selector,
590591
.waiting = &waiting,
591-
.flags = flags
592+
.io = io,
593+
.flags = flags,
592594
};
593595

594596
return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
@@ -619,6 +621,7 @@ static inline off_t io_seekable(int descriptor)
619621
struct io_read_arguments {
620622
struct IO_Event_Selector_URing *selector;
621623
struct IO_Event_Selector_URing_Waiting *waiting;
624+
VALUE io;
622625
int descriptor;
623626
off_t offset;
624627
char *buffer;
@@ -638,7 +641,7 @@ io_read_submit(VALUE _arguments)
638641
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
639642
io_uring_submit_now(selector);
640643

641-
IO_Event_Selector_loop_yield(&selector->backend);
644+
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);
642645

643646
return RB_INT2NUM(arguments->waiting->result);
644647
}
@@ -664,7 +667,7 @@ io_read_ensure(VALUE _arguments)
664667
}
665668

666669
static int
667-
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
670+
io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
668671
{
669672
struct IO_Event_Selector_URing_Waiting waiting = {
670673
.fiber = fiber,
@@ -677,6 +680,7 @@ io_read(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, c
677680
struct io_read_arguments io_read_arguments = {
678681
.selector = selector,
679682
.waiting = &waiting,
683+
.io = io,
680684
.descriptor = descriptor,
681685
.offset = offset,
682686
.buffer = buffer,
@@ -705,7 +709,7 @@ VALUE IO_Event_Selector_URing_io_read(VALUE self, VALUE fiber, VALUE io, VALUE b
705709

706710
size_t maximum_size = size - offset;
707711
while (maximum_size) {
708-
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
712+
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);
709713

710714
if (result > 0) {
711715
total += result;
@@ -756,7 +760,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE
756760

757761
size_t maximum_size = size - offset;
758762
while (maximum_size) {
759-
int result = io_read(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
763+
int result = io_read(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);
760764

761765
if (result > 0) {
762766
total += result;
@@ -783,6 +787,7 @@ VALUE IO_Event_Selector_URing_io_pread(VALUE self, VALUE fiber, VALUE io, VALUE
783787
struct io_write_arguments {
784788
struct IO_Event_Selector_URing *selector;
785789
struct IO_Event_Selector_URing_Waiting *waiting;
790+
VALUE io;
786791
int descriptor;
787792
off_t offset;
788793
char *buffer;
@@ -802,7 +807,7 @@ io_write_submit(VALUE _argument)
802807
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
803808
io_uring_submit_pending(selector);
804809

805-
IO_Event_Selector_loop_yield(&selector->backend);
810+
IO_Event_Selector_loop_yield_io(&selector->backend, arguments->io);
806811

807812
return RB_INT2NUM(arguments->waiting->result);
808813
}
@@ -828,7 +833,7 @@ io_write_ensure(VALUE _argument)
828833
}
829834

830835
static int
831-
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor, char *buffer, size_t length, off_t offset)
836+
io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, VALUE io, int descriptor, char *buffer, size_t length, off_t offset)
832837
{
833838
struct IO_Event_Selector_URing_Waiting waiting = {
834839
.fiber = fiber,
@@ -841,6 +846,7 @@ io_write(struct IO_Event_Selector_URing *selector, VALUE fiber, int descriptor,
841846
struct io_write_arguments arguments = {
842847
.selector = selector,
843848
.waiting = &waiting,
849+
.io = io,
844850
.descriptor = descriptor,
845851
.offset = offset,
846852
.buffer = buffer,
@@ -873,7 +879,7 @@ VALUE IO_Event_Selector_URing_io_write(VALUE self, VALUE fiber, VALUE io, VALUE
873879

874880
size_t maximum_size = size - offset;
875881
while (maximum_size) {
876-
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
882+
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);
877883

878884
if (result > 0) {
879885
total += result;
@@ -928,7 +934,7 @@ VALUE IO_Event_Selector_URing_io_pwrite(VALUE self, VALUE fiber, VALUE io, VALUE
928934

929935
size_t maximum_size = size - offset;
930936
while (maximum_size) {
931-
int result = io_write(selector, fiber, descriptor, (char*)base+offset, maximum_size, from);
937+
int result = io_write(selector, fiber, io, descriptor, (char*)base+offset, maximum_size, from);
932938

933939
if (result > 0) {
934940
total += result;

0 commit comments

Comments
 (0)