From e7695ba3d9f0e8ee17025af4d42ecaf2dad47f29 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Fri, 2 Jan 2026 11:41:24 -0500 Subject: [PATCH 1/4] [ruby/mmtk] Check for T_NONE during marking https://github.com/ruby/mmtk/commit/c3e338bb25 --- gc/mmtk/mmtk.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/gc/mmtk/mmtk.c b/gc/mmtk/mmtk.c index b532d3a774cca1..6da17fdb23b1e7 100644 --- a/gc/mmtk/mmtk.c +++ b/gc/mmtk/mmtk.c @@ -72,6 +72,8 @@ struct MMTk_final_job { #ifdef RB_THREAD_LOCAL_SPECIFIER RB_THREAD_LOCAL_SPECIFIER struct MMTk_GCThreadTLS *rb_mmtk_gc_thread_tls; + +RB_THREAD_LOCAL_SPECIFIER VALUE marking_parent_object; #else # error We currently need language-supported TLS #endif @@ -270,14 +272,18 @@ rb_mmtk_update_object_references(MMTk_ObjectReference mmtk_object) VALUE object = (VALUE)mmtk_object; if (!RB_FL_TEST(object, RUBY_FL_WEAK_REFERENCE)) { + marking_parent_object = object; rb_gc_update_object_references(rb_gc_get_objspace(), object); + marking_parent_object = 0; } } static void rb_mmtk_call_gc_mark_children(MMTk_ObjectReference object) { + marking_parent_object = (VALUE)object; rb_gc_mark_children(rb_gc_get_objspace(), (VALUE)object); + marking_parent_object = 0; } static void @@ -285,11 +291,15 @@ rb_mmtk_handle_weak_references(MMTk_ObjectReference mmtk_object, bool moving) { VALUE object = (VALUE)mmtk_object; + marking_parent_object = object; + rb_gc_handle_weak_references(object); if (moving) { rb_gc_update_object_references(rb_gc_get_objspace(), object); } + + marking_parent_object = 0; } static void @@ -797,6 +807,17 @@ void rb_gc_impl_adjust_memory_usage(void *objspace_ptr, ssize_t diff) { } static inline VALUE rb_mmtk_call_object_closure(VALUE obj, bool pin) { + if (RB_UNLIKELY(RB_BUILTIN_TYPE(obj) == T_NONE)) { + const size_t info_size = 256; + char obj_info_buf[info_size]; + rb_raw_obj_info(obj_info_buf, info_size, obj); + + char parent_obj_info_buf[info_size]; + rb_raw_obj_info(parent_obj_info_buf, info_size, marking_parent_object); + + rb_bug("try to mark T_NONE object (obj: %s, parent: %s)", obj_info_buf, parent_obj_info_buf); + } + return (VALUE)rb_mmtk_gc_thread_tls->object_closure.c_function( rb_mmtk_gc_thread_tls->object_closure.rust_closure, rb_mmtk_gc_thread_tls->gc_context, From 16feb46fa27fdbdec4f7a0914787300b77fa232a Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Fri, 2 Jan 2026 12:33:34 +0100 Subject: [PATCH 2/4] Convert Queue and SizedQueue to rb builtin A large part of `thread_sync.c` was migrated already, might as well go all the way. It also allow to remove a bunch of Rdoc commands. --- test/ruby/test_settracefunc.rb | 2 +- thread_sync.c | 515 ++------------------------------- thread_sync.rb | 344 +++++++++++++++++++++- 3 files changed, 363 insertions(+), 498 deletions(-) diff --git a/test/ruby/test_settracefunc.rb b/test/ruby/test_settracefunc.rb index 776534a2b54770..d3b2441e21e7c3 100644 --- a/test/ruby/test_settracefunc.rb +++ b/test/ruby/test_settracefunc.rb @@ -2226,7 +2226,7 @@ def method_for_test_thread_add_trace_func def test_thread_add_trace_func events = [] base_line = __LINE__ - q = Thread::Queue.new + q = [] t = Thread.new{ Thread.current.add_trace_func proc{|ev, file, line, *args| events << [ev, line] if file == __FILE__ diff --git a/thread_sync.c b/thread_sync.c index 3227bab8435b1a..e3916c97cbd0a6 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -2,8 +2,7 @@ #include "ccan/list/list.h" #include "builtin.h" -static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; -static VALUE rb_eClosedQueueError; +static VALUE rb_cMutex, rb_eClosedQueueError; /* Mutex */ typedef struct rb_mutex_struct { @@ -83,30 +82,6 @@ static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); #endif static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial); -/* - * Document-class: Thread::Mutex - * - * Thread::Mutex implements a simple semaphore that can be used to - * coordinate access to shared data from multiple concurrent threads. - * - * Example: - * - * semaphore = Thread::Mutex.new - * - * a = Thread.new { - * semaphore.synchronize { - * # access shared resource - * } - * } - * - * b = Thread.new { - * semaphore.synchronize { - * # access shared resource - * } - * } - * - */ - static size_t rb_mutex_num_waiting(rb_mutex_t *mutex) { @@ -759,19 +734,19 @@ queue_alloc(VALUE klass) return obj; } -static int +static inline bool queue_fork_check(struct rb_queue *q) { rb_serial_t fork_gen = GET_VM()->fork_gen; - if (q->fork_gen == fork_gen) { - return 0; + if (RB_LIKELY(q->fork_gen == fork_gen)) { + return false; } /* forked children can't reach into parent thread stacks */ q->fork_gen = fork_gen; ccan_list_head_init(&q->waitq); q->num_waiting = 0; - return 1; + return true; } static inline struct rb_queue * @@ -870,7 +845,7 @@ raw_szqueue_ptr(VALUE obj) struct rb_szqueue *sq; TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq); - if (queue_fork_check(&sq->q)) { + if (RB_UNLIKELY(queue_fork_check(&sq->q))) { ccan_list_head_init(szqueue_pushq(sq)); sq->num_waiting_push = 0; } @@ -886,7 +861,7 @@ szqueue_ptr(VALUE obj) return sq; } -static int +static inline bool queue_closed_p(VALUE self) { return FL_TEST_RAW(self, QUEUE_CLOSED) != 0; @@ -967,76 +942,16 @@ ring_buffer_shift(struct rb_queue *q) return obj; } -/* - * Document-class: Thread::Queue - * - * The Thread::Queue class implements multi-producer, multi-consumer - * queues. It is especially useful in threaded programming when - * information must be exchanged safely between multiple threads. The - * Thread::Queue class implements all the required locking semantics. - * - * The class implements FIFO (first in, first out) type of queue. - * In a FIFO queue, the first tasks added are the first retrieved. - * - * Example: - * - * queue = Thread::Queue.new - * - * producer = Thread.new do - * 5.times do |i| - * sleep rand(i) # simulate expense - * queue << i - * puts "#{i} produced" - * end - * end - * - * consumer = Thread.new do - * 5.times do |i| - * value = queue.pop - * sleep rand(i/2) # simulate expense - * puts "consumed #{value}" - * end - * end - * - * consumer.join - * - */ - -/* - * Document-method: Queue::new - * - * call-seq: - * Thread::Queue.new -> empty_queue - * Thread::Queue.new(enumerable) -> queue - * - * Creates a new queue instance, optionally using the contents of an +enumerable+ - * for its initial state. - * - * Example: - * - * q = Thread::Queue.new - * #=> # - * q.empty? - * #=> true - * - * q = Thread::Queue.new([1, 2, 3]) - * #=> # - * q.empty? - * #=> false - * q.pop - * #=> 1 - */ - static VALUE -rb_queue_initialize(int argc, VALUE *argv, VALUE self) +queue_initialize(rb_execution_context_t *ec, VALUE self, VALUE initial) { - VALUE initial; struct rb_queue *q = raw_queue_ptr(self); - if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) { - initial = rb_to_array(initial); - } ccan_list_head_init(&q->waitq); - if (argc == 1) { + if (NIL_P(initial)) { + ring_buffer_init(q, QUEUE_INITIAL_CAPA); + } + else { + initial = rb_to_array(initial); long len = RARRAY_LEN(initial); long initial_capa = QUEUE_INITIAL_CAPA; while (initial_capa < len) { @@ -1046,9 +961,6 @@ rb_queue_initialize(int argc, VALUE *argv, VALUE self) MEMCPY(q->buffer, RARRAY_CONST_PTR(initial), VALUE, len); q->len = len; } - else { - ring_buffer_init(q, QUEUE_INITIAL_CAPA); - } return self; } @@ -1064,82 +976,6 @@ queue_do_push(VALUE self, struct rb_queue *q, VALUE obj) return self; } -/* - * Document-method: Thread::Queue#close - * call-seq: - * close - * - * Closes the queue. A closed queue cannot be re-opened. - * - * After the call to close completes, the following are true: - * - * - +closed?+ will return true - * - * - +close+ will be ignored. - * - * - calling enq/push/<< will raise a +ClosedQueueError+. - * - * - when +empty?+ is false, calling deq/pop/shift will return an object - * from the queue as usual. - * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil. - * deq(true) will raise a +ThreadError+. - * - * ClosedQueueError is inherited from StopIteration, so that you can break loop block. - * - * Example: - * - * q = Thread::Queue.new - * Thread.new{ - * while e = q.deq # wait for nil to break loop - * # ... - * end - * } - * q.close - */ - -static VALUE -rb_queue_close(VALUE self) -{ - struct rb_queue *q = queue_ptr(self); - - if (!queue_closed_p(self)) { - FL_SET(self, QUEUE_CLOSED); - - wakeup_all(&q->waitq); - } - - return self; -} - -/* - * Document-method: Thread::Queue#closed? - * call-seq: closed? - * - * Returns +true+ if the queue is closed. - */ - -static VALUE -rb_queue_closed_p(VALUE self) -{ - return RBOOL(queue_closed_p(self)); -} - -/* - * Document-method: Thread::Queue#push - * call-seq: - * push(object) - * enq(object) - * <<(object) - * - * Pushes the given +object+ to the queue. - */ - -static VALUE -rb_queue_push(VALUE self, VALUE obj) -{ - return queue_do_push(self, queue_ptr(self), obj); -} - static VALUE queue_sleep(VALUE _args) { @@ -1231,19 +1067,6 @@ rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE time return queue_do_pop(ec, self, queue_ptr(self), non_block, timeout); } -/* - * Document-method: Thread::Queue#empty? - * call-seq: empty? - * - * Returns +true+ if the queue is empty. - */ - -static VALUE -rb_queue_empty_p(VALUE self) -{ - return RBOOL(queue_ptr(self)->len == 0); -} - static void queue_clear(struct rb_queue *q) { @@ -1251,87 +1074,12 @@ queue_clear(struct rb_queue *q) q->offset = 0; } -/* - * Document-method: Thread::Queue#clear - * - * Removes all objects from the queue. - */ - -static VALUE -rb_queue_clear(VALUE self) -{ - queue_clear(queue_ptr(self)); - return self; -} - -/* - * Document-method: Thread::Queue#length - * call-seq: - * length - * size - * - * Returns the length of the queue. - */ - -static VALUE -rb_queue_length(VALUE self) -{ - return LONG2NUM(queue_ptr(self)->len); -} - -NORETURN(static VALUE rb_queue_freeze(VALUE self)); -/* - * call-seq: - * freeze - * - * The queue can't be frozen, so this method raises an exception: - * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #) - * - */ -static VALUE -rb_queue_freeze(VALUE self) -{ - rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self); - UNREACHABLE_RETURN(self); -} - -/* - * Document-method: Thread::Queue#num_waiting - * - * Returns the number of threads waiting on the queue. - */ - -static VALUE -rb_queue_num_waiting(VALUE self) -{ - struct rb_queue *q = queue_ptr(self); - - return INT2NUM(q->num_waiting); -} - -/* - * Document-class: Thread::SizedQueue - * - * This class represents queues of specified size capacity. The push operation - * may be blocked if the capacity is full. - * - * See Thread::Queue for an example of how a Thread::SizedQueue works. - */ - -/* - * Document-method: SizedQueue::new - * call-seq: new(max) - * - * Creates a fixed-length queue with a maximum size of +max+. - */ - static VALUE -rb_szqueue_initialize(VALUE self, VALUE vmax) +szqueue_initialize(rb_execution_context_t *ec, VALUE self, VALUE vmax) { - long max; + long max = NUM2LONG(vmax); struct rb_szqueue *sq = raw_szqueue_ptr(self); - max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } @@ -1343,68 +1091,6 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) return self; } -/* - * Document-method: Thread::SizedQueue#close - * call-seq: - * close - * - * Similar to Thread::Queue#close. - * - * The difference is behavior with waiting enqueuing threads. - * - * If there are waiting enqueuing threads, they are interrupted by - * raising ClosedQueueError('queue closed'). - */ -static VALUE -rb_szqueue_close(VALUE self) -{ - if (!queue_closed_p(self)) { - struct rb_szqueue *sq = szqueue_ptr(self); - - FL_SET(self, QUEUE_CLOSED); - wakeup_all(szqueue_waitq(sq)); - wakeup_all(szqueue_pushq(sq)); - } - return self; -} - -/* - * Document-method: Thread::SizedQueue#max - * - * Returns the maximum size of the queue. - */ - -static VALUE -rb_szqueue_max_get(VALUE self) -{ - return LONG2NUM(szqueue_ptr(self)->max); -} - -/* - * Document-method: Thread::SizedQueue#max= - * call-seq: max=(number) - * - * Sets the maximum size of the queue to the given +number+. - */ - -static VALUE -rb_szqueue_max_set(VALUE self, VALUE vmax) -{ - long max = NUM2LONG(vmax); - long diff = 0; - struct rb_szqueue *sq = szqueue_ptr(self); - - if (max <= 0) { - rb_raise(rb_eArgError, "queue size must be positive"); - } - if (max > sq->max) { - diff = max - sq->max; - } - sq->max = max; - sync_wakeup(szqueue_pushq(sq), diff); - return vmax; -} - static VALUE rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout) { @@ -1464,124 +1150,12 @@ rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE ti return retval; } -/* - * Document-method: Thread::SizedQueue#clear - * - * Removes all objects from the queue. - */ - -static VALUE -rb_szqueue_clear(VALUE self) -{ - struct rb_szqueue *sq = szqueue_ptr(self); - queue_clear(&sq->q); - wakeup_all(szqueue_pushq(sq)); - return self; -} - -/* - * Document-method: Thread::SizedQueue#num_waiting - * - * Returns the number of threads waiting on the queue. - */ - -static VALUE -rb_szqueue_num_waiting(VALUE self) -{ - struct rb_szqueue *sq = szqueue_ptr(self); - - return INT2NUM(sq->q.num_waiting + sq->num_waiting_push); -} - - /* ConditionalVariable */ struct rb_condvar { struct ccan_list_head waitq; rb_serial_t fork_gen; }; -/* - * Document-class: Thread::ConditionVariable - * - * ConditionVariable objects augment class Mutex. Using condition variables, - * it is possible to suspend while in the middle of a critical section until a - * condition is met, such as a resource becomes available. - * - * Due to non-deterministic scheduling and spurious wake-ups, users of - * condition variables should always use a separate boolean predicate (such as - * reading from a boolean variable) to check if the condition is actually met - * before starting to wait, and should wait in a loop, re-checking the - * condition every time the ConditionVariable is waken up. The idiomatic way - * of using condition variables is calling the +wait+ method in an +until+ - * loop with the predicate as the loop condition. - * - * condvar.wait(mutex) until condition_is_met - * - * In the example below, we use the boolean variable +resource_available+ - * (which is protected by +mutex+) to indicate the availability of the - * resource, and use +condvar+ to wait for that variable to become true. Note - * that: - * - * 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so - * fast that it have already made the resource available before either - * +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if - * +resource_available+ is already true before starting to wait. - * 2. The +wait+ method may spuriously wake up without signalling. Therefore, - * thread +a1+ and +a2+ should recheck +resource_available+ after the - * +wait+ method returns, and go back to wait if the condition is not - * actually met. - * 3. It is possible that thread +a2+ starts right after thread +a1+ is waken - * up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the - * resource before thread +a1+ acquires the +mutex+. This necessitates - * rechecking after +wait+, too. - * - * Example: - * - * mutex = Thread::Mutex.new - * - * resource_available = false - * condvar = Thread::ConditionVariable.new - * - * a1 = Thread.new { - * # Thread 'a1' waits for the resource to become available and consumes - * # the resource. - * mutex.synchronize { - * condvar.wait(mutex) until resource_available - * # After the loop, 'resource_available' is guaranteed to be true. - * - * resource_available = false - * puts "a1 consumed the resource" - * } - * } - * - * a2 = Thread.new { - * # Thread 'a2' behaves like 'a1'. - * mutex.synchronize { - * condvar.wait(mutex) until resource_available - * resource_available = false - * puts "a2 consumed the resource" - * } - * } - * - * b = Thread.new { - * # Thread 'b' periodically makes the resource available. - * loop { - * mutex.synchronize { - * resource_available = true - * - * # Notify one waiting thread if any. It is possible that neither - * # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK. - * condvar.signal - * } - * sleep 1 - * } - * } - * - * # Eventually both 'a1' and 'a2' will have their resources, albeit in an - * # unspecified order. - * [a1, a2].each {|th| th.join} - */ - static size_t condvar_memsize(const void *ptr) { @@ -1679,75 +1253,24 @@ rb_condvar_broadcast(rb_execution_context_t *ec, VALUE self) return self; } -NORETURN(static VALUE undumpable(VALUE obj)); -/* :nodoc: */ -static VALUE -undumpable(VALUE obj) -{ - rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj)); - UNREACHABLE_RETURN(Qnil); -} - -static VALUE -define_thread_class(VALUE outer, const ID name, VALUE super) -{ - VALUE klass = rb_define_class_id_under(outer, name, super); - rb_const_set(rb_cObject, name, klass); - return klass; -} - static void Init_thread_sync(void) { -#undef rb_intern -#if defined(TEACH_RDOC) && TEACH_RDOC == 42 - rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject); - rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject); - rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject); - rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject); -#endif - -#define DEFINE_CLASS(name, super) \ - rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super) - /* Mutex */ - DEFINE_CLASS(Mutex, Object); + rb_cMutex = rb_define_class_id_under(rb_cThread, rb_intern("Mutex"), rb_cObject); rb_define_alloc_func(rb_cMutex, mutex_alloc); /* Queue */ - DEFINE_CLASS(Queue, Object); + VALUE rb_cQueue = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("Queue"), rb_cObject); rb_define_alloc_func(rb_cQueue, queue_alloc); rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration); - rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1); - rb_undef_method(rb_cQueue, "initialize_copy"); - rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); - rb_define_method(rb_cQueue, "close", rb_queue_close, 0); - rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); - rb_define_method(rb_cQueue, "push", rb_queue_push, 1); - rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); - rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); - rb_define_method(rb_cQueue, "length", rb_queue_length, 0); - rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); - rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0); - - rb_define_alias(rb_cQueue, "enq", "push"); - rb_define_alias(rb_cQueue, "<<", "push"); - rb_define_alias(rb_cQueue, "size", "length"); - - DEFINE_CLASS(SizedQueue, Queue); + VALUE rb_cSizedQueue = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("SizedQueue"), rb_cQueue); rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc); - rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); - rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0); - rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); - rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); - rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); - rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); - /* CVar */ - DEFINE_CLASS(ConditionVariable, Object); + VALUE rb_cConditionVariable = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("ConditionVariable"), rb_cObject); rb_define_alloc_func(rb_cConditionVariable, condvar_alloc); id_sleep = rb_intern("sleep"); diff --git a/thread_sync.rb b/thread_sync.rb index a722b7ec1a2595..398c0d02b7b75a 100644 --- a/thread_sync.rb +++ b/thread_sync.rb @@ -1,7 +1,62 @@ # frozen_string_literal: true class Thread + # The Thread::Queue class implements multi-producer, multi-consumer + # queues. It is especially useful in threaded programming when + # information must be exchanged safely between multiple threads. The + # Thread::Queue class implements all the required locking semantics. + # + # The class implements FIFO (first in, first out) type of queue. + # In a FIFO queue, the first tasks added are the first retrieved. + # + # Example: + # + # queue = Thread::Queue.new + # + # producer = Thread.new do + # 5.times do |i| + # sleep rand(i) # simulate expense + # queue << i + # puts "#{i} produced" + # end + # end + # + # consumer = Thread.new do + # 5.times do |i| + # value = queue.pop + # sleep rand(i/2) # simulate expense + # puts "consumed #{value}" + # end + # end + # + # consumer.join class Queue + # Document-method: Queue::new + # + # call-seq: + # Thread::Queue.new -> empty_queue + # Thread::Queue.new(enumerable) -> queue + # + # Creates a new queue instance, optionally using the contents of an +enumerable+ + # for its initial state. + # + # Example: + # + # q = Thread::Queue.new + # #=> # + # q.empty? + # #=> true + # + # q = Thread::Queue.new([1, 2, 3]) + # #=> # + # q.empty? + # #=> false + # q.pop + # #=> 1 + def initialize(enumerable = nil) + Primitive.queue_initialize(enumerable) + end + # call-seq: # pop(non_block=false, timeout: nil) # @@ -21,9 +76,129 @@ def pop(non_block = false, timeout: nil) end alias_method :deq, :pop alias_method :shift, :pop + + undef_method :initialize_copy + + # call-seq: + # push(object) + # enq(object) + # <<(object) + # + # Pushes the given +object+ to the queue. + def push(object) + Primitive.cexpr!('queue_do_push(self, queue_ptr(self), object)') + end + alias_method :enq, :push + alias_method :<<, :push + + # call-seq: + # close + # + # Closes the queue. A closed queue cannot be re-opened. + # + # After the call to close completes, the following are true: + # + # - +closed?+ will return true + # + # - +close+ will be ignored. + # + # - calling enq/push/<< will raise a +ClosedQueueError+. + # + # - when +empty?+ is false, calling deq/pop/shift will return an object + # from the queue as usual. + # - when +empty?+ is true, deq(false) will not suspend the thread and will return nil. + # deq(true) will raise a +ThreadError+. + # + # ClosedQueueError is inherited from StopIteration, so that you can break loop block. + # + # Example: + # + # q = Thread::Queue.new + # Thread.new{ + # while e = q.deq # wait for nil to break loop + # # ... + # end + # } + # q.close + def close + Primitive.cstmt! %{ + if (!queue_closed_p(self)) { + FL_SET_RAW(self, QUEUE_CLOSED); + + wakeup_all(&queue_ptr(self)->waitq); + } + + return self; + } + end + + # call-seq: closed? + # + # Returns +true+ if the queue is closed. + def closed? + Primitive.cexpr!('RBOOL(FL_TEST_RAW(self, QUEUE_CLOSED))') + end + + # call-seq: + # length + # size + # + # Returns the length of the queue. + def length + Primitive.cexpr!('LONG2NUM(queue_ptr(self)->len)') + end + alias_method :size, :length + + # call-seq: empty? + # + # Returns +true+ if the queue is empty. + def empty? + Primitive.cexpr!('RBOOL(queue_ptr(self)->len == 0)') + end + + # Removes all objects from the queue. + def clear + Primitive.cstmt! %{ + queue_clear(queue_ptr(self)); + return self; + } + end + + # call-seq: + # num_waiting + # + # Returns the number of threads waiting on the queue. + def num_waiting + Primitive.cexpr!('INT2NUM(queue_ptr(self)->num_waiting)') + end + + def marshal_dump # :nodoc: + raise TypeError, "can't dump #{self.class}" + end + + # call-seq: + # freeze + # + # The queue can't be frozen, so this method raises an exception: + # Thread::Queue.new.freeze # Raises TypeError (cannot freeze #) + def freeze + raise TypeError, "cannot freeze #{self}" + end end - class SizedQueue + # This class represents queues of specified size capacity. The push operation + # may be blocked if the capacity is full. + # + # See Thread::Queue for an example of how a Thread::SizedQueue works. + class SizedQueue < Queue + # Document-method: SizedQueue::new + # call-seq: new(max) + # + # Creates a fixed-length queue with a maximum size of +max+. + def initialize(vmax) + Primitive.szqueue_initialize(vmax) + end + # call-seq: # pop(non_block=false, timeout: nil) # @@ -66,8 +241,93 @@ def push(object, non_block = false, timeout: nil) end alias_method :enq, :push alias_method :<<, :push + + # call-seq: + # close + # + # Similar to Thread::Queue#close. + # + # The difference is behavior with waiting enqueuing threads. + # + # If there are waiting enqueuing threads, they are interrupted by + # raising ClosedQueueError('queue closed'). + def close + Primitive.cstmt! %{ + if (!queue_closed_p(self)) { + struct rb_szqueue *sq = szqueue_ptr(self); + + FL_SET(self, QUEUE_CLOSED); + wakeup_all(szqueue_waitq(sq)); + wakeup_all(szqueue_pushq(sq)); + } + return self; + } + end + + # Removes all objects from the queue. + def clear + Primitive.cstmt! %{ + struct rb_szqueue *sq = szqueue_ptr(self); + queue_clear(&sq->q); + wakeup_all(szqueue_pushq(sq)); + return self; + } + end + + # Returns the number of threads waiting on the queue. + def num_waiting + Primitive.cstmt! %{ + struct rb_szqueue *sq = szqueue_ptr(self); + return INT2NUM(sq->q.num_waiting + sq->num_waiting_push); + } + end + + # Returns the maximum size of the queue. + def max + Primitive.cexpr!('LONG2NUM(szqueue_ptr(self)->max)') + end + + # call-seq: max=(number) + # + # Sets the maximum size of the queue to the given +number+. + def max=(vmax) + Primitive.cstmt! %{ + long max = NUM2LONG(vmax); + if (max <= 0) { + rb_raise(rb_eArgError, "queue size must be positive"); + } + + long diff = 0; + struct rb_szqueue *sq = szqueue_ptr(self); + + if (max > sq->max) { + diff = max - sq->max; + } + sq->max = max; + sync_wakeup(szqueue_pushq(sq), diff); + return vmax; + } + end end + # Thread::Mutex implements a simple semaphore that can be used to + # coordinate access to shared data from multiple concurrent threads. + # + # Example: + # + # semaphore = Thread::Mutex.new + # + # a = Thread.new { + # semaphore.synchronize { + # # access shared resource + # } + # } + # + # b = Thread.new { + # semaphore.synchronize { + # # access shared resource + # } + # } class Mutex # call-seq: # Thread::Mutex.new -> mutex @@ -149,6 +409,83 @@ def sleep(timeout = nil) end end + # ConditionVariable objects augment class Mutex. Using condition variables, + # it is possible to suspend while in the middle of a critical section until a + # condition is met, such as a resource becomes available. + # + # Due to non-deterministic scheduling and spurious wake-ups, users of + # condition variables should always use a separate boolean predicate (such as + # reading from a boolean variable) to check if the condition is actually met + # before starting to wait, and should wait in a loop, re-checking the + # condition every time the ConditionVariable is waken up. The idiomatic way + # of using condition variables is calling the +wait+ method in an +until+ + # loop with the predicate as the loop condition. + # + # condvar.wait(mutex) until condition_is_met + # + # In the example below, we use the boolean variable +resource_available+ + # (which is protected by +mutex+) to indicate the availability of the + # resource, and use +condvar+ to wait for that variable to become true. Note + # that: + # + # 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so + # fast that it have already made the resource available before either + # +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if + # +resource_available+ is already true before starting to wait. + # 2. The +wait+ method may spuriously wake up without signalling. Therefore, + # thread +a1+ and +a2+ should recheck +resource_available+ after the + # +wait+ method returns, and go back to wait if the condition is not + # actually met. + # 3. It is possible that thread +a2+ starts right after thread +a1+ is waken + # up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the + # resource before thread +a1+ acquires the +mutex+. This necessitates + # rechecking after +wait+, too. + # + # Example: + # + # mutex = Thread::Mutex.new + # + # resource_available = false + # condvar = Thread::ConditionVariable.new + # + # a1 = Thread.new { + # # Thread 'a1' waits for the resource to become available and consumes + # # the resource. + # mutex.synchronize { + # condvar.wait(mutex) until resource_available + # # After the loop, 'resource_available' is guaranteed to be true. + # + # resource_available = false + # puts "a1 consumed the resource" + # } + # } + # + # a2 = Thread.new { + # # Thread 'a2' behaves like 'a1'. + # mutex.synchronize { + # condvar.wait(mutex) until resource_available + # resource_available = false + # puts "a2 consumed the resource" + # } + # } + # + # b = Thread.new { + # # Thread 'b' periodically makes the resource available. + # loop { + # mutex.synchronize { + # resource_available = true + # + # # Notify one waiting thread if any. It is possible that neither + # # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK. + # condvar.signal + # } + # sleep 1 + # } + # } + # + # # Eventually both 'a1' and 'a2' will have their resources, albeit in an + # # unspecified order. + # [a1, a2].each {|th| th.join} class ConditionVariable # Document-method: ConditionVariable::new # @@ -193,3 +530,8 @@ def wait(mutex, timeout=nil) end end end + +Mutex = Thread::Mutex +ConditionVariable = Thread::ConditionVariable +Queue = Thread::Queue +SizedQueue = Thread::SizedQueue From 60d9b10dab9c9e92518f5579e6d36006c0dd359d Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Fri, 2 Jan 2026 16:36:05 -0500 Subject: [PATCH 3/4] [ruby/mmtk] Propagate crash of GC thread to mutator thread This allows the mutator thread to dump its backtrace when a GC thread crashes. https://github.com/ruby/mmtk/commit/40ff9ffee7 --- gc/mmtk/mmtk.c | 36 ++++++++++++++++++++++++++++++++++++ gc/mmtk/mmtk.h | 1 + gc/mmtk/src/abi.rs | 1 + gc/mmtk/src/lib.rs | 4 ++-- 4 files changed, 40 insertions(+), 2 deletions(-) diff --git a/gc/mmtk/mmtk.c b/gc/mmtk/mmtk.c index 6da17fdb23b1e7..fc1fa8bd2eb865 100644 --- a/gc/mmtk/mmtk.c +++ b/gc/mmtk/mmtk.c @@ -38,6 +38,11 @@ struct objspace { pthread_cond_t cond_world_started; size_t start_the_world_count; + struct { + bool gc_thread_crashed; + char crash_msg[256]; + } crash_context; + struct rb_gc_vm_context vm_context; unsigned int fork_hook_vm_lock_lev; @@ -169,6 +174,10 @@ rb_mmtk_block_for_gc(MMTk_VMMutatorThread mutator) pthread_cond_wait(&objspace->cond_world_started, &objspace->mutex); } + if (RB_UNLIKELY(objspace->crash_context.gc_thread_crashed)) { + rb_bug("%s", objspace->crash_context.crash_msg); + } + if (objspace->measure_gc_time) { struct timespec gc_end_time; clock_gettime(CLOCK_MONOTONIC, &gc_end_time); @@ -424,6 +433,32 @@ rb_mmtk_special_const_p(MMTk_ObjectReference object) return RB_SPECIAL_CONST_P(obj); } +RBIMPL_ATTR_FORMAT(RBIMPL_PRINTF_FORMAT, 1, 2) +static void +rb_mmtk_gc_thread_bug(const char *msg, ...) +{ + struct objspace *objspace = rb_gc_get_objspace(); + + objspace->crash_context.gc_thread_crashed = true; + + va_list args; + va_start(args, msg); + vsnprintf(objspace->crash_context.crash_msg, sizeof(objspace->crash_context.crash_msg), msg, args); + va_end(args); + + rb_mmtk_resume_mutators(); + + sleep(5); + + rb_bug("rb_mmtk_gc_thread_bug"); +} + +static void +rb_mmtk_gc_thread_panic_handler(void) +{ + rb_mmtk_gc_thread_bug("MMTk GC thread panicked"); +} + static void rb_mmtk_mutator_thread_panic_handler(void) { @@ -454,6 +489,7 @@ MMTk_RubyUpcalls ruby_upcalls = { rb_mmtk_update_finalizer_table, rb_mmtk_special_const_p, rb_mmtk_mutator_thread_panic_handler, + rb_mmtk_gc_thread_panic_handler, }; // Use max 80% of the available memory by default for MMTk diff --git a/gc/mmtk/mmtk.h b/gc/mmtk/mmtk.h index f7da0f95f0214d..b7c8248718526a 100644 --- a/gc/mmtk/mmtk.h +++ b/gc/mmtk/mmtk.h @@ -79,6 +79,7 @@ typedef struct MMTk_RubyUpcalls { void (*update_finalizer_table)(void); bool (*special_const_p)(MMTk_ObjectReference object); void (*mutator_thread_panic_handler)(void); + void (*gc_thread_panic_handler)(void); } MMTk_RubyUpcalls; typedef struct MMTk_RawVecOfObjRef { diff --git a/gc/mmtk/src/abi.rs b/gc/mmtk/src/abi.rs index 1e03dbf2f98809..255b2b1e5658be 100644 --- a/gc/mmtk/src/abi.rs +++ b/gc/mmtk/src/abi.rs @@ -323,6 +323,7 @@ pub struct RubyUpcalls { pub update_finalizer_table: extern "C" fn(), pub special_const_p: extern "C" fn(object: ObjectReference) -> bool, pub mutator_thread_panic_handler: extern "C" fn(), + pub gc_thread_panic_handler: extern "C" fn(), } unsafe impl Sync for RubyUpcalls {} diff --git a/gc/mmtk/src/lib.rs b/gc/mmtk/src/lib.rs index 0ee8f6752e5778..52dc782051f741 100644 --- a/gc/mmtk/src/lib.rs +++ b/gc/mmtk/src/lib.rs @@ -126,8 +126,6 @@ fn handle_gc_thread_panic(panic_info: &PanicHookInfo) { eprintln!("Unknown backtrace status: {s:?}"); } } - - std::process::abort(); } pub(crate) fn set_panic_hook() { @@ -140,6 +138,8 @@ pub(crate) fn set_panic_hook() { std::panic::set_hook(Box::new(move |panic_info| { if is_gc_thread(std::thread::current().id()) { handle_gc_thread_panic(panic_info); + + (crate::binding().upcalls().gc_thread_panic_handler)(); } else { old_hook(panic_info); (crate::MUTATOR_THREAD_PANIC_HANDLER From 2f4119eaea5416ae69c9256a72ab98237e221b91 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Fri, 2 Jan 2026 16:37:25 -0500 Subject: [PATCH 4/4] [ruby/mmtk] Use rb_mmtk_gc_thread_bug for rb_mmtk_call_object_closure https://github.com/ruby/mmtk/commit/308936296a --- gc/mmtk/mmtk.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gc/mmtk/mmtk.c b/gc/mmtk/mmtk.c index fc1fa8bd2eb865..2c447b0d389f65 100644 --- a/gc/mmtk/mmtk.c +++ b/gc/mmtk/mmtk.c @@ -851,7 +851,7 @@ rb_mmtk_call_object_closure(VALUE obj, bool pin) char parent_obj_info_buf[info_size]; rb_raw_obj_info(parent_obj_info_buf, info_size, marking_parent_object); - rb_bug("try to mark T_NONE object (obj: %s, parent: %s)", obj_info_buf, parent_obj_info_buf); + rb_mmtk_gc_thread_bug("try to mark T_NONE object (obj: %s, parent: %s)", obj_info_buf, parent_obj_info_buf); } return (VALUE)rb_mmtk_gc_thread_tls->object_closure.c_function(