Skip to content

Commit 9c26830

Browse files
authored
Introduce Fiber::Scheduler#blocking_operation_wait. (ruby#12016)
Redirect `rb_nogvl` blocking operations to the fiber scheduler if possible to prevent stalling the event loop. [Feature #20876]
1 parent 86b1c83 commit 9c26830

File tree

8 files changed

+128
-1
lines changed

8 files changed

+128
-1
lines changed

NEWS.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ Note: We're only listing outstanding class updates.
6161
associated with the AST node. [[Feature #20624]]
6262
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]
6363

64+
* Fiber::Scheduler
65+
66+
* An optional `Fiber::Scheduler#blocking_operation_wait` hook allows blocking operations to be moved out of the
67+
event loop in order to reduce latency and improve multi-core processor utilization. [[Feature #20876]]
68+
6469
## Stdlib updates
6570

6671
* Tempfile
@@ -236,3 +241,4 @@ details of the default gems or bundled gems.
236241
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
237242
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
238243
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
244+
[Feature #20876]: https://bugs.ruby-lang.org/issues/20876

common.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16689,6 +16689,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
1668916689
scheduler.$(OBJEXT): {$(VPATH)}shape.h
1669016690
scheduler.$(OBJEXT): {$(VPATH)}st.h
1669116691
scheduler.$(OBJEXT): {$(VPATH)}subst.h
16692+
scheduler.$(OBJEXT): {$(VPATH)}thread.h
1669216693
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
1669316694
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
1669416695
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h

include/ruby/fiber/scheduler.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
391391
*/
392392
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
393393

394+
struct rb_fiber_scheduler_blocking_operation_state {
395+
void *result;
396+
int saved_errno;
397+
};
398+
399+
/**
400+
* Defer the execution of the passed function to the scheduler.
401+
*
402+
* @param[in] scheduler Target scheduler.
403+
* @param[in] function The function to run.
404+
* @param[in] data The data to pass to the function.
405+
* @param[in] unblock_function The unblock function to use to interrupt the operation.
406+
* @param[in] data2 The data to pass to the unblock function.
407+
* @param[in] flags Flags passed to `rb_nogvl`.
408+
* @param[out] state The result and errno of the operation.
409+
* @retval RUBY_Qundef `scheduler` doesn't have `#blocking_operation_wait`.
410+
* @return otherwise What `scheduler.blocking_operation_wait` returns.
411+
*/
412+
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
413+
394414
/**
395415
* Create and schedule a non-blocking fiber.
396416
*

include/ruby/thread.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@
5959
*/
6060
#define RB_NOGVL_UBF_ASYNC_SAFE (0x2)
6161

62+
/**
63+
* Passing this flag to rb_nogvl() indicates that the passed function
64+
* is safe to offload to a background thread or work pool. In other words, the
65+
* function is safe to run using a fiber scheduler's `blocking_operation_wait`.
66+
* hook.
67+
*
68+
* If your function depends on thread-local storage, or thread-specific data
69+
* operations/data structures, you should not set this flag, as
70+
* these operations may behave differently (or fail) when run in a different
71+
* thread/context (e.g. unlocking a mutex).
72+
*/
73+
#define RB_NOGVL_OFFLOAD_SAFE (0x4)
74+
6275
/** @} */
6376

6477
RBIMPL_SYMBOL_EXPORT_BEGIN()

internal/io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct rb_io;
1515

1616
#include "ruby/io.h" /* for rb_io_t */
1717

18-
#define IO_WITHOUT_GVL(func, arg) rb_thread_call_without_gvl(func, arg, RUBY_UBF_IO, 0)
18+
#define IO_WITHOUT_GVL(func, arg) rb_nogvl(func, arg, RUBY_UBF_IO, 0, RB_NOGVL_OFFLOAD_SAFE)
1919
#define IO_WITHOUT_GVL_INT(func, arg) (int)(VALUE)IO_WITHOUT_GVL(func, arg)
2020

2121
/** Ruby's IO, metadata and buffers. */

scheduler.c

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
#include "ruby/io.h"
1414
#include "ruby/io/buffer.h"
1515

16+
#include "ruby/thread.h"
17+
18+
// For `ruby_thread_has_gvl_p`.
1619
#include "internal/thread.h"
1720

1821
static ID id_close;
@@ -33,6 +36,8 @@ static ID id_io_close;
3336

3437
static ID id_address_resolve;
3538

39+
static ID id_blocking_operation_wait;
40+
3641
static ID id_fiber_schedule;
3742

3843
/*
@@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void)
109114

110115
id_address_resolve = rb_intern_const("address_resolve");
111116

117+
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
118+
112119
id_fiber_schedule = rb_intern_const("fiber");
113120

114121
#if 0 /* for RDoc */
@@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
693700
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
694701
}
695702

703+
struct rb_blocking_operation_wait_arguments {
704+
void *(*function)(void *);
705+
void *data;
706+
rb_unblock_function_t *unblock_function;
707+
void *data2;
708+
int flags;
709+
710+
struct rb_fiber_scheduler_blocking_operation_state *state;
711+
};
712+
713+
static VALUE
714+
rb_fiber_scheduler_blocking_operation_wait_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
715+
{
716+
struct rb_blocking_operation_wait_arguments *arguments = (struct rb_blocking_operation_wait_arguments*)_arguments;
717+
718+
if (arguments->state == NULL) {
719+
rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
720+
}
721+
722+
arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
723+
arguments->state->saved_errno = rb_errno();
724+
725+
// Make sure it's only invoked once.
726+
arguments->state = NULL;
727+
728+
return Qnil;
729+
}
730+
731+
/*
732+
* Document-method: Fiber::Scheduler#blocking_operation_wait
733+
* call-seq: blocking_operation_wait(work)
734+
*
735+
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
736+
*
737+
* Minimal suggested implementation is:
738+
*
739+
* def blocking_operation_wait(work)
740+
* Thread.new(&work).join
741+
* end
742+
*/
743+
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
744+
{
745+
struct rb_blocking_operation_wait_arguments arguments = {
746+
.function = function,
747+
.data = data,
748+
.unblock_function = unblock_function,
749+
.data2 = data2,
750+
.flags = flags,
751+
.state = state
752+
};
753+
754+
VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);
755+
756+
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
757+
}
758+
696759
/*
697760
* Document-method: Fiber::Scheduler#fiber
698761
* call-seq: fiber(&block)

test/fiber/scheduler.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,16 @@ def address_resolve(hostname)
309309
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
310310
end.value
311311
end
312+
313+
def blocking_operation_wait(work)
314+
thread = Thread.new(&work)
315+
316+
thread.join
317+
318+
thread = nil
319+
ensure
320+
thread&.kill
321+
end
312322
end
313323

314324
# This scheduler class implements `io_read` and `io_write` hooks which require

thread.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,6 +1539,20 @@ rb_nogvl(void *(*func)(void *), void *data1,
15391539
rb_unblock_function_t *ubf, void *data2,
15401540
int flags)
15411541
{
1542+
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
1543+
VALUE scheduler = rb_fiber_scheduler_current();
1544+
if (scheduler != Qnil) {
1545+
struct rb_fiber_scheduler_blocking_operation_state state;
1546+
1547+
VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
1548+
1549+
if (!UNDEF_P(result)) {
1550+
rb_errno_set(state.saved_errno);
1551+
return state.result;
1552+
}
1553+
}
1554+
}
1555+
15421556
void *val = 0;
15431557
rb_execution_context_t *ec = GET_EC();
15441558
rb_thread_t *th = rb_ec_thread_ptr(ec);

0 commit comments

Comments
 (0)