Skip to content

Commit 769adf1

Browse files
committed
Logging.
1 parent 14a662b commit 769adf1

File tree

1 file changed

+56
-4
lines changed

1 file changed

+56
-4
lines changed

ext/io/event/worker_pool.c

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,58 +143,89 @@ static void *worker_wait_and_execute(void *_worker) {
143143
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker;
144144
struct IO_Event_WorkerPool *pool = worker->pool;
145145

146+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: starting worker=%p pool=%p\n", (void*)worker, (void*)pool);
147+
146148
while (true) {
147149
struct IO_Event_WorkerPool_Work *work = NULL;
148150

151+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: acquiring mutex worker=%p\n", (void*)worker);
149152
pthread_mutex_lock(&pool->mutex);
150153

151154
// Wait for work, shutdown, or interruption
152155
while (!pool->work_queue && !pool->shutdown && !worker->interrupted) {
156+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: waiting for work worker=%p\n", (void*)worker);
153157
pthread_cond_wait(&pool->work_available, &pool->mutex);
158+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: woke up worker=%p\n", (void*)worker);
154159
}
155160

156161
if (pool->shutdown || worker->interrupted) {
162+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: shutdown=%d interrupted=%d worker=%p\n",
163+
pool->shutdown, worker->interrupted, (void*)worker);
157164
pthread_mutex_unlock(&pool->mutex);
158165
break;
159166
}
160167

161168
work = dequeue_work(pool);
169+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: dequeued work=%p worker=%p\n", (void*)work, (void*)worker);
162170

163171
pthread_mutex_unlock(&pool->mutex);
164172

165173
// Execute work WITHOUT GVL (this is the whole point!)
166174
if (work) {
175+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: executing work=%p blocking_operation=%p worker=%p\n",
176+
(void*)work, (void*)work->blocking_operation, (void*)worker);
177+
167178
worker->current_blocking_operation = work->blocking_operation;
168179
rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation);
169180
worker->current_blocking_operation = NULL;
181+
182+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: work execution completed work=%p worker=%p\n",
183+
(void*)work, (void*)worker);
170184
}
171185

186+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: returning work=%p worker=%p\n", (void*)work, (void*)worker);
172187
return work;
173188
}
174189

190+
fprintf(stderr, "[WorkerPool DEBUG] worker_wait_and_execute: returning NULL (shutdown) worker=%p\n", (void*)worker);
175191
return NULL; // Shutdown signal
176192
}
177193

178194
static VALUE worker_thread_func(void *_worker) {
179195
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker;
180196

197+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: starting worker=%p\n", (void*)worker);
198+
181199
while (true) {
200+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: calling rb_thread_call_without_gvl worker=%p\n", (void*)worker);
201+
182202
// Wait for work and execute it without holding GVL
183-
struct IO_Event_WorkerPool_Work *work = (struct IO_Event_WorkerPool_Work *)rb_thread_call_without_gvl(worker_wait_and_execute, worker, worker_unblock_func, worker);
203+
void *result = rb_thread_call_without_gvl(worker_wait_and_execute, worker, worker_unblock_func, worker);
204+
struct IO_Event_WorkerPool_Work *work = (struct IO_Event_WorkerPool_Work *)result;
205+
206+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: rb_thread_call_without_gvl returned work=%p (raw=%p) worker=%p\n", (void*)work, result, (void*)worker);
184207

185208
if (!work) {
186-
// Shutdown signal received
209+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: shutdown signal received worker=%p\n", (void*)worker);
187210
break;
188211
}
189212

213+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: setting work->completed=true work=%p worker=%p\n", (void*)work, (void*)worker);
214+
190215
// Protected by GVL:
191216
work->completed = true;
192217
worker->pool->completed_count++;
193218

219+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: calling rb_fiber_scheduler_unblock work=%p scheduler=%p blocker=%p fiber=%p worker=%p\n",
220+
(void*)work, (void*)work->scheduler, (void*)work->blocker, (void*)work->fiber, (void*)worker);
221+
194222
// Work was executed without GVL, now unblock the waiting fiber (we have GVL here)
195223
rb_fiber_scheduler_unblock(work->scheduler, work->blocker, work->fiber);
224+
225+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: rb_fiber_scheduler_unblock completed work=%p worker=%p\n", (void*)work, (void*)worker);
196226
}
197227

228+
fprintf(stderr, "[WorkerPool DEBUG] worker_thread_func: exiting worker=%p\n", (void*)worker);
198229
return Qnil;
199230
}
200231

@@ -287,9 +318,12 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) {
287318
static VALUE worker_pool_work_begin(VALUE _work) {
288319
struct IO_Event_WorkerPool_Work *work = (void*)_work;
289320

290-
if (DEBUG) fprintf(stderr, "worker_pool_work_begin:rb_fiber_scheduler_block work=%p\n", work);
321+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_work_begin: work=%p scheduler=%p blocker=%p fiber=%p\n",
322+
work, (void*)work->scheduler, (void*)work->blocker, (void*)work->fiber);
323+
291324
rb_fiber_scheduler_block(work->scheduler, work->blocker, Qnil);
292325

326+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_work_begin: completed work=%p\n", work);
293327
return Qnil;
294328
}
295329

@@ -298,6 +332,8 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
298332
struct IO_Event_WorkerPool *pool;
299333
TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool);
300334

335+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: starting pool=%p\n", (void*)pool);
336+
301337
if (pool->shutdown) {
302338
rb_raise(rb_eRuntimeError, "Worker pool is shut down!");
303339
}
@@ -319,6 +355,9 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
319355
rb_raise(rb_eArgError, "Invalid blocking operation!");
320356
}
321357

358+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: blocking_operation=%p fiber=%p scheduler=%p\n",
359+
(void*)blocking_operation, (void*)fiber, (void*)scheduler);
360+
322361
// Create work item
323362
struct IO_Event_WorkerPool_Work work = {
324363
.blocking_operation = blocking_operation,
@@ -328,27 +367,40 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
328367
.fiber = fiber,
329368
.next = NULL
330369
};
370+
371+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: created work item work=%p on stack\n", (void*)&work);
331372

332373
// Enqueue work:
333374
pthread_mutex_lock(&pool->mutex);
334375
enqueue_work(pool, &work);
335376
pthread_cond_signal(&pool->work_available);
336377
pthread_mutex_unlock(&pool->mutex);
337378

379+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: enqueued work, starting wait loop work=%p\n", (void*)&work);
380+
338381
// Block the current fiber until work is completed:
339382
int state;
383+
int iteration = 0;
340384
while (true) {
385+
iteration++;
386+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: iteration %d, work.completed=%d work=%p\n",
387+
iteration, work.completed, (void*)&work);
388+
341389
rb_protect(worker_pool_work_begin, (VALUE)&work, &state);
342390

343391
if (work.completed) {
392+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: work completed, breaking work=%p\n", (void*)&work);
344393
break;
345394
} else {
346-
if (DEBUG) fprintf(stderr, "worker_pool_call:rb_fiber_scheduler_blocking_operation_cancel\n");
395+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: work not completed, cancelling blocking_operation=%p\n",
396+
(void*)blocking_operation);
347397
rb_fiber_scheduler_blocking_operation_cancel(blocking_operation);
348398
// The work was not completed, we need to wait for it to be completed.
349399
}
350400
}
351401

402+
fprintf(stderr, "[WorkerPool DEBUG] worker_pool_call: exiting work=%p state=%d\n", (void*)&work, state);
403+
352404
if (state) {
353405
rb_jump_tag(state);
354406
} else {

0 commit comments

Comments
 (0)