Skip to content

Commit 8d8159e

Browse files
authored
Fix thread scheduler issue with thread_sched_wait_events (ruby#15392)
Fix race between timer thread dequeuing waiting thread and thread skipping sleeping due to being dequeued. We now use `th->event_serial` which is protected by `thread_sched_lock`. When a thread is put on timer thread's waiting list, the event serial is saved on the item. The timer thread checks that the saved serial is the same as current thread's serial before calling `thread_sched_to_ready`. The following script (taken from a test in `test_thread.rb` used to crash on scheduler debug assertions. It would likely crash in non-debug mode as well. ```ruby def assert_nil(val) if val != nil raise "Expected #{val} to be nil" end end def assert_equal(expected, actual) if expected != actual raise "Expected #{expected} to be #{actual}" end end def test_join2 ok = false t1 = Thread.new { ok = true; sleep } Thread.pass until ok Thread.pass until t1.stop? t2 = Thread.new do Thread.pass while ok t1.join(0.01) end t3 = Thread.new do ok = false t1.join end assert_nil(t2.value) t1.wakeup assert_equal(t1, t3.value) ensure t1&.kill&.join t2&.kill&.join t3&.kill&.join end rs = 30.times.map do Ractor.new do test_join2 end end rs.each(&:join) ```
1 parent 7d9558f commit 8d8159e

File tree

4 files changed

+42
-25
lines changed

4 files changed

+42
-25
lines changed

thread_pthread.c

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,8 +1122,10 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t
11221122
{
11231123
if (!RUBY_VM_INTERRUPTED(th->ec)) {
11241124
bool can_direct_transfer = !th_has_dedicated_nt(th);
1125+
th->status = THREAD_STOPPED_FOREVER;
11251126
thread_sched_wakeup_next_thread(sched, th, can_direct_transfer);
11261127
thread_sched_wait_running_turn(sched, th, can_direct_transfer);
1128+
th->status = THREAD_RUNNABLE;
11271129
}
11281130
else {
11291131
RUBY_DEBUG_LOG("th:%u interrupted", rb_th_serial(th));
@@ -1149,6 +1151,7 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
11491151
bool can_direct_transfer = !th_has_dedicated_nt(th);
11501152
thread_sched_to_ready_common(sched, th, false, can_direct_transfer);
11511153
thread_sched_wait_running_turn(sched, th, can_direct_transfer);
1154+
th->status = THREAD_RUNNABLE;
11521155
}
11531156
else {
11541157
VM_ASSERT(sched->readyq_cnt == 0);
@@ -1338,7 +1341,7 @@ void rb_ractor_lock_self(rb_ractor_t *r);
13381341
void rb_ractor_unlock_self(rb_ractor_t *r);
13391342

13401343
// The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for
1341-
// a ractor action to wake it up. See docs for `ractor_sched_sleep_with_cleanup` for more info.
1344+
// a ractor action to wake it up.
13421345
void
13431346
rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ubf_arg)
13441347
{
@@ -2868,7 +2871,7 @@ static struct {
28682871

28692872
static void timer_thread_check_timeslice(rb_vm_t *vm);
28702873
static int timer_thread_set_timeout(rb_vm_t *vm);
2871-
static void timer_thread_wakeup_thread(rb_thread_t *th);
2874+
static void timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial);
28722875

28732876
#include "thread_pthread_mn.c"
28742877

@@ -2970,7 +2973,7 @@ timer_thread_check_exceed(rb_hrtime_t abs, rb_hrtime_t now)
29702973
}
29712974

29722975
static rb_thread_t *
2973-
timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now)
2976+
timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now, uint32_t *event_serial)
29742977
{
29752978
struct rb_thread_sched_waiting *w = ccan_list_top(&timer_th.waiting, struct rb_thread_sched_waiting, node);
29762979

@@ -2987,32 +2990,31 @@ timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now)
29872990
w->flags = thread_sched_waiting_none;
29882991
w->data.result = 0;
29892992

2990-
return thread_sched_waiting_thread(w);
2993+
rb_thread_t *th = thread_sched_waiting_thread(w);
2994+
*event_serial = w->data.event_serial;
2995+
return th;
29912996
}
29922997

29932998
return NULL;
29942999
}
29953000

29963001
static void
2997-
timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th)
3002+
timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial)
29983003
{
2999-
if (sched->running != th) {
3004+
if (sched->running != th && th->event_serial == event_serial) {
30003005
thread_sched_to_ready_common(sched, th, true, false);
30013006
}
3002-
else {
3003-
// will be release the execution right
3004-
}
30053007
}
30063008

30073009
static void
3008-
timer_thread_wakeup_thread(rb_thread_t *th)
3010+
timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial)
30093011
{
30103012
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
30113013
struct rb_thread_sched *sched = TH_SCHED(th);
30123014

30133015
thread_sched_lock(sched, th);
30143016
{
3015-
timer_thread_wakeup_thread_locked(sched, th);
3017+
timer_thread_wakeup_thread_locked(sched, th, event_serial);
30163018
}
30173019
thread_sched_unlock(sched, th);
30183020
}
@@ -3022,11 +3024,14 @@ timer_thread_check_timeout(rb_vm_t *vm)
30223024
{
30233025
rb_hrtime_t now = rb_hrtime_now();
30243026
rb_thread_t *th;
3027+
uint32_t event_serial;
30253028

30263029
rb_native_mutex_lock(&timer_th.waiting_lock);
30273030
{
3028-
while ((th = timer_thread_deq_wakeup(vm, now)) != NULL) {
3029-
timer_thread_wakeup_thread(th);
3031+
while ((th = timer_thread_deq_wakeup(vm, now, &event_serial)) != NULL) {
3032+
rb_native_mutex_unlock(&timer_th.waiting_lock);
3033+
timer_thread_wakeup_thread(th, event_serial);
3034+
rb_native_mutex_lock(&timer_th.waiting_lock);
30303035
}
30313036
}
30323037
rb_native_mutex_unlock(&timer_th.waiting_lock);

thread_pthread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct rb_thread_sched_waiting {
3939
#else
4040
uint64_t timeout;
4141
#endif
42+
uint32_t event_serial;
4243
int fd; // -1 for timeout only
4344
int result;
4445
} data;

thread_pthread_mn.c

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#if USE_MN_THREADS
44

55
static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
6-
static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th);
6+
static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial);
77

88
static bool
99
timer_thread_cancel_waiting(rb_thread_t *th)
@@ -15,9 +15,7 @@ timer_thread_cancel_waiting(rb_thread_t *th)
1515
if (th->sched.waiting_reason.flags) {
1616
canceled = true;
1717
ccan_list_del_init(&th->sched.waiting_reason.node);
18-
if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) {
19-
timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
20-
}
18+
timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
2119
th->sched.waiting_reason.flags = thread_sched_waiting_none;
2220
}
2321
}
@@ -57,7 +55,7 @@ ubf_event_waiting(void *ptr)
5755
thread_sched_unlock(sched, th);
5856
}
5957

60-
static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel);
58+
static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial);
6159

6260
// return true if timed out
6361
static bool
@@ -67,23 +65,27 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd,
6765

6866
volatile bool timedout = false, need_cancel = false;
6967

68+
uint32_t event_serial = ++th->event_serial; // overflow is okay
69+
7070
if (ubf_set(th, ubf_event_waiting, (void *)th)) {
7171
return false;
7272
}
7373

7474
thread_sched_lock(sched, th);
7575
{
76-
if (timer_thread_register_waiting(th, fd, events, rel)) {
76+
if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) {
7777
RUBY_DEBUG_LOG("wait fd:%d", fd);
7878

7979
RB_VM_SAVE_MACHINE_CONTEXT(th);
8080

8181
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
8282

8383
if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
84-
// already awaken
84+
th->event_serial++;
85+
// timer thread has dequeued us already, but it won't try to wake us because we bumped our serial
8586
}
8687
else if (RUBY_VM_INTERRUPTED(th->ec)) {
88+
th->event_serial++; // make sure timer thread doesn't try to wake us
8789
need_cancel = true;
8890
}
8991
else {
@@ -111,7 +113,8 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd,
111113
}
112114
thread_sched_unlock(sched, th);
113115

114-
ubf_clear(th); // TODO: maybe it is already NULL?
116+
// if ubf triggered between sched unlock and ubf clear, sched->running == th here
117+
ubf_clear(th);
115118

116119
VM_ASSERT(sched->running == th);
117120

@@ -680,7 +683,7 @@ kqueue_already_registered(int fd)
680683

681684
// return false if the fd is not waitable or not need to wait.
682685
static bool
683-
timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
686+
timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial)
684687
{
685688
RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
686689

@@ -807,6 +810,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
807810
th->sched.waiting_reason.data.timeout = abs;
808811
th->sched.waiting_reason.data.fd = fd;
809812
th->sched.waiting_reason.data.result = 0;
813+
th->sched.waiting_reason.data.event_serial = event_serial;
810814
}
811815

812816
if (abs == 0) { // no timeout
@@ -855,6 +859,10 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
855859
static void
856860
timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
857861
{
862+
if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) {
863+
return;
864+
}
865+
858866
RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
859867
#if HAVE_SYS_EVENT_H
860868
kqueue_unregister_waiting(fd, flags);
@@ -885,7 +893,7 @@ timer_thread_setup_mn(void)
885893
#endif
886894
RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
887895

888-
timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
896+
timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0);
889897
}
890898

891899
static int
@@ -986,8 +994,9 @@ timer_thread_polling(rb_vm_t *vm)
986994
th->sched.waiting_reason.flags = thread_sched_waiting_none;
987995
th->sched.waiting_reason.data.fd = -1;
988996
th->sched.waiting_reason.data.result = filter;
997+
uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
989998

990-
timer_thread_wakeup_thread_locked(sched, th);
999+
timer_thread_wakeup_thread_locked(sched, th, event_serial);
9911000
}
9921001
else {
9931002
// already released
@@ -1031,8 +1040,9 @@ timer_thread_polling(rb_vm_t *vm)
10311040
th->sched.waiting_reason.flags = thread_sched_waiting_none;
10321041
th->sched.waiting_reason.data.fd = -1;
10331042
th->sched.waiting_reason.data.result = (int)events;
1043+
uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
10341044

1035-
timer_thread_wakeup_thread_locked(sched, th);
1045+
timer_thread_wakeup_thread_locked(sched, th, event_serial);
10361046
}
10371047
else {
10381048
// already released

vm_core.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ typedef struct rb_thread_struct {
11271127
struct rb_thread_sched_item sched;
11281128
bool mn_schedulable;
11291129
rb_atomic_t serial; // only for RUBY_DEBUG_LOG()
1130+
uint32_t event_serial;
11301131

11311132
VALUE last_status; /* $? */
11321133

0 commit comments

Comments
 (0)