Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions ext/io/event/worker_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ static void worker_pool_free(void *ptr) {
}
}

static void worker_pool_mark(void *ptr)
{
struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr;
struct IO_Event_WorkerPool_Worker *worker = pool->workers;
while (worker) {
struct IO_Event_WorkerPool_Worker *next = worker->next;
// We need to mark the thread even though its marked through the VM's ractors because we call `join`
// on them after their completion. They could be freed by then.
rb_gc_mark(worker->thread); // thread objects are currently pinned anyway
worker = next;
}
}

// Size functions for Ruby GC
static size_t worker_pool_size(const void *ptr) {
return sizeof(struct IO_Event_WorkerPool);
Expand All @@ -94,7 +107,7 @@ static size_t worker_pool_size(const void *ptr) {
// Ruby TypedData structures
static const rb_data_type_t IO_Event_WorkerPool_type = {
"IO::Event::WorkerPool",
{0, worker_pool_free, worker_pool_size,},
{worker_pool_mark, worker_pool_free, worker_pool_size,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
};

Expand Down Expand Up @@ -199,22 +212,20 @@ static VALUE worker_thread_func(void *_worker) {
}

// Create a new worker thread
static int create_worker_thread(struct IO_Event_WorkerPool *pool) {
if (pool->current_worker_count >= pool->maximum_worker_count) {
return -1;
}

struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker));
if (!worker) {
return -1;
}

worker->pool = pool;
worker->interrupted = false;
worker->current_blocking_operation = NULL;
worker->next = pool->workers;

worker->thread = rb_thread_create(worker_thread_func, worker);
static int create_worker_thread(VALUE self, struct IO_Event_WorkerPool *pool) {
if (pool->current_worker_count >= pool->maximum_worker_count) {
return -1;
}

struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker));
if (!worker) {
return -1;
}
@@ -214,7 +227,7 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) {
worker->current_blocking_operation = NULL;
worker->next = pool->workers;

RB_OBJ_WRITE(self, &worker->thread, rb_thread_create(worker_thread_func, worker));
if (NIL_P(worker->thread)) {
free(worker);
return -1;
Expand Down Expand Up @@ -273,7 +284,7 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) {

// Create initial workers
for (size_t i = 0; i < maximum_worker_count; i++) {
if (create_worker_thread(pool) != 0) {
if (create_worker_thread(self, pool) != 0) {
// Just set the maximum_worker_count for debugging, don't fail completely
// worker_pool_free(pool);
// rb_raise(rb_eRuntimeError, "Failed to create workers");
Expand Down
Loading