Skip to content

Commit 903522e

Browse files
authored
Mark the thread from the worker pool. (#152)
* Mark the thread from the worker pool. If we don't mark them, the thread object can be freed as soon as it ends. It doesn't seem to be a problem when the worker pool just has 1 thread, but if there are multiple threads then each call to `Thread#join` can cause a GC step as it calls back into ruby.
1 parent 44666dc commit 903522e

File tree

1 file changed

+29
-18
lines changed

1 file changed

+29
-18
lines changed

ext/io/event/worker_pool.c

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ static void worker_pool_free(void *ptr) {
8686
}
8787
}
8888

89+
static void worker_pool_mark(void *ptr)
90+
{
91+
struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr;
92+
struct IO_Event_WorkerPool_Worker *worker = pool->workers;
93+
while (worker) {
94+
struct IO_Event_WorkerPool_Worker *next = worker->next;
95+
// We need to mark the thread even though its marked through the VM's ractors because we call `join`
96+
// on them after their completion. They could be freed by then.
97+
rb_gc_mark(worker->thread); // thread objects are currently pinned anyway
98+
worker = next;
99+
}
100+
}
101+
89102
// Size functions for Ruby GC
90103
static size_t worker_pool_size(const void *ptr) {
91104
return sizeof(struct IO_Event_WorkerPool);
@@ -94,7 +107,7 @@ static size_t worker_pool_size(const void *ptr) {
94107
// Ruby TypedData structures
95108
static const rb_data_type_t IO_Event_WorkerPool_type = {
96109
"IO::Event::WorkerPool",
97-
{0, worker_pool_free, worker_pool_size,},
110+
{worker_pool_mark, worker_pool_free, worker_pool_size,},
98111
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
99112
};
100113

@@ -199,22 +212,20 @@ static VALUE worker_thread_func(void *_worker) {
199212
}
200213

201214
// Create a new worker thread
202-
static int create_worker_thread(struct IO_Event_WorkerPool *pool) {
203-
if (pool->current_worker_count >= pool->maximum_worker_count) {
204-
return -1;
205-
}
206-
207-
struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker));
208-
if (!worker) {
209-
return -1;
210-
}
211-
212-
worker->pool = pool;
213-
worker->interrupted = false;
214-
worker->current_blocking_operation = NULL;
215-
worker->next = pool->workers;
216-
217-
worker->thread = rb_thread_create(worker_thread_func, worker);
215+
static int create_worker_thread(VALUE self, struct IO_Event_WorkerPool *pool) {
216+
if (pool->current_worker_count >= pool->maximum_worker_count) {
217+
return -1;
218+
}
219+
220+
struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker));
221+
if (!worker) {
222+
return -1;
223+
}
224+
@@ -214,7 +227,7 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) {
225+
worker->current_blocking_operation = NULL;
226+
worker->next = pool->workers;
227+
228+
RB_OBJ_WRITE(self, &worker->thread, rb_thread_create(worker_thread_func, worker));
218229
if (NIL_P(worker->thread)) {
219230
free(worker);
220231
return -1;
@@ -273,7 +284,7 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) {
273284

274285
// Create initial workers
275286
for (size_t i = 0; i < maximum_worker_count; i++) {
276-
if (create_worker_thread(pool) != 0) {
287+
if (create_worker_thread(self, pool) != 0) {
277288
// Just set the maximum_worker_count for debugging, don't fail completely
278289
// worker_pool_free(pool);
279290
// rb_raise(rb_eRuntimeError, "Failed to create workers");

0 commit comments

Comments
 (0)