Skip to content

Commit 9a29252

Browse files
authored
Fix compatibility with fiber schedulers that don't implement #fiber_interrupt. (ruby#13492)
1 parent 20d7db8 commit 9a29252

File tree

2 files changed

+55
-14
lines changed

2 files changed

+55
-14
lines changed

scheduler.c

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ verify_interface(VALUE scheduler)
170170
if (!rb_respond_to(scheduler, id_io_wait)) {
171171
rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
172172
}
173+
174+
if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
175+
rb_warn("Scheduler should implement #fiber_interrupt");
176+
}
173177
}
174178

175179
static VALUE
@@ -458,7 +462,11 @@ rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeou
458462
scheduler, io, events, timeout
459463
};
460464

461-
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
465+
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
466+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
467+
} else {
468+
return fiber_scheduler_io_wait((VALUE)&arguments);
469+
}
462470
}
463471

464472
VALUE
@@ -546,7 +554,11 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
546554
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
547555
};
548556

549-
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
557+
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
558+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
559+
} else {
560+
return fiber_scheduler_io_read((VALUE)&arguments);
561+
}
550562
}
551563

552564
/*
@@ -581,7 +593,11 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
581593
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
582594
};
583595

584-
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
596+
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
597+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
598+
} else {
599+
return fiber_scheduler_io_pread((VALUE)&arguments);
600+
}
585601
}
586602

587603
/*
@@ -630,7 +646,11 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
630646
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
631647
};
632648

633-
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
649+
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
650+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
651+
} else {
652+
return fiber_scheduler_io_write((VALUE)&arguments);
653+
}
634654
}
635655

636656
/*
@@ -666,7 +686,11 @@ rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buf
666686
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
667687
};
668688

669-
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
689+
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
690+
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
691+
} else {
692+
return fiber_scheduler_io_pwrite((VALUE)&arguments);
693+
}
670694
}
671695

672696
VALUE

thread.c

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,12 @@ rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation
17211721
ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
17221722
}
17231723

1724+
static void
1725+
rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
1726+
{
1727+
ccan_list_del(&blocking_operation->list);
1728+
}
1729+
17241730
struct io_blocking_operation_arguments {
17251731
struct rb_io *io;
17261732
struct rb_io_blocking_operation *blocking_operation;
@@ -1732,7 +1738,7 @@ io_blocking_operation_exit(VALUE _arguments)
17321738
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
17331739
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
17341740

1735-
ccan_list_del(&blocking_operation->list);
1741+
rb_io_blocking_operation_pop(arguments->io, blocking_operation);
17361742

17371743
rb_io_t *io = arguments->io;
17381744
rb_thread_t *thread = io->closing_ec->thread_ptr;
@@ -1763,6 +1769,9 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation
17631769
{
17641770
VALUE wakeup_mutex = io->wakeup_mutex;
17651771

1772+
// Indicate that the blocking operation is no longer active:
1773+
blocking_operation->ec = NULL;
1774+
17661775
if (RB_TEST(wakeup_mutex)) {
17671776
struct io_blocking_operation_arguments arguments = {
17681777
.io = io,
@@ -1772,7 +1781,8 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation
17721781
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
17731782
}
17741783
else {
1775-
ccan_list_del(&blocking_operation->list);
1784+
// If there's no wakeup_mutex, we can safely remove the operation directly:
1785+
rb_io_blocking_operation_pop(io, blocking_operation);
17761786
}
17771787
}
17781788

@@ -1809,7 +1819,7 @@ rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argum
18091819
struct rb_io_blocking_operation blocking_operation = {
18101820
.ec = ec,
18111821
};
1812-
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
1822+
rb_io_blocking_operation_enter(io, &blocking_operation);
18131823

18141824
struct io_blocking_operation_arguments io_blocking_operation_arguments = {
18151825
.io = io,
@@ -2765,13 +2775,20 @@ thread_io_close_notify_all(VALUE _io)
27652775
ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
27662776
rb_execution_context_t *ec = blocking_operation->ec;
27672777

2768-
rb_thread_t *thread = ec->thread_ptr;
2778+
// If the operation is in progress, we need to interrupt it:
2779+
if (ec) {
2780+
rb_thread_t *thread = ec->thread_ptr;
27692781

2770-
if (thread->scheduler != Qnil) {
2771-
rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
2772-
} else {
2773-
rb_threadptr_pending_interrupt_enque(thread, error);
2774-
rb_threadptr_interrupt(thread);
2782+
VALUE result = RUBY_Qundef;
2783+
if (thread->scheduler != Qnil) {
2784+
result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
2785+
}
2786+
2787+
if (result == RUBY_Qundef) {
2788+
// If the thread is not the current thread, we need to enqueue an error:
2789+
rb_threadptr_pending_interrupt_enque(thread, error);
2790+
rb_threadptr_interrupt(thread);
2791+
}
27752792
}
27762793

27772794
count += 1;

0 commit comments

Comments
 (0)