diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 62f7e6a..6e4311a 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -86,6 +86,19 @@ static void worker_pool_free(void *ptr) { } } +static void worker_pool_mark(void *ptr) +{ + struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr; + struct IO_Event_WorkerPool_Worker *worker = pool->workers; + while (worker) { + struct IO_Event_WorkerPool_Worker *next = worker->next; + // We need to mark the thread even though its marked through the VM's ractors because we call `join` + // on them after their completion. They could be freed by then. + rb_gc_mark(worker->thread); // thread objects are currently pinned anyway + worker = next; + } +} + // Size functions for Ruby GC static size_t worker_pool_size(const void *ptr) { return sizeof(struct IO_Event_WorkerPool); @@ -94,7 +107,7 @@ static size_t worker_pool_size(const void *ptr) { // Ruby TypedData structures static const rb_data_type_t IO_Event_WorkerPool_type = { "IO::Event::WorkerPool", - {0, worker_pool_free, worker_pool_size,}, + {worker_pool_mark, worker_pool_free, worker_pool_size,}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; @@ -199,22 +212,20 @@ static VALUE worker_thread_func(void *_worker) { } // Create a new worker thread -static int create_worker_thread(struct IO_Event_WorkerPool *pool) { - if (pool->current_worker_count >= pool->maximum_worker_count) { - return -1; - } - - struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker)); - if (!worker) { - return -1; - } - - worker->pool = pool; - worker->interrupted = false; - worker->current_blocking_operation = NULL; - worker->next = pool->workers; - - worker->thread = rb_thread_create(worker_thread_func, worker); +static int create_worker_thread(VALUE self, struct IO_Event_WorkerPool *pool) { +if (pool->current_worker_count >= pool->maximum_worker_count) { +return -1; +} + + struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker)); +if (!worker) { +return -1; +} +@@ -214,7 +227,7 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) { +worker->current_blocking_operation = NULL; +worker->next = pool->workers; + + RB_OBJ_WRITE(self, &worker->thread, rb_thread_create(worker_thread_func, worker)); if (NIL_P(worker->thread)) { free(worker); return -1; @@ -273,7 +284,7 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { // Create initial workers for (size_t i = 0; i < maximum_worker_count; i++) { - if (create_worker_thread(pool) != 0) { + if (create_worker_thread(self, pool) != 0) { // Just set the maximum_worker_count for debugging, don't fail completely // worker_pool_free(pool); // rb_raise(rb_eRuntimeError, "Failed to create workers");