Skip to content

Commit 391404f

Browse files
committed
#57: Fast return from suspend if the coroutine was resumed either during the start of event processing or when the reactor tick was invoked.
1 parent d37995d commit 391404f

File tree

2 files changed

+61
-19
lines changed

2 files changed

+61
-19
lines changed

coroutine.c

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -663,17 +663,19 @@ void async_coroutine_suspend(const bool from_main)
663663

664664
void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, const bool transfer_error)
665665
{
666-
if (UNEXPECTED(coroutine->waker == NULL || coroutine->waker->status == ZEND_ASYNC_WAKER_NO_STATUS)) {
666+
zend_async_waker_t *waker = coroutine->waker;
667+
668+
if (UNEXPECTED(waker == NULL || waker->status == ZEND_ASYNC_WAKER_NO_STATUS)) {
667669
async_throw_error("Cannot resume a coroutine that has not been suspended");
668670
return;
669671
}
670672

671673
if (error != NULL) {
672-
if (coroutine->waker->error != NULL) {
674+
if (waker->error != NULL) {
673675

674676
if (false == instanceof_function(error->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION))) {
675-
zend_exception_set_previous(error, coroutine->waker->error);
676-
coroutine->waker->error = error;
677+
zend_exception_set_previous(error, waker->error);
678+
waker->error = error;
677679

678680
if (false == transfer_error) {
679681
GC_ADDREF(error);
@@ -684,15 +686,27 @@ void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, con
684686
}
685687
}
686688
} else {
687-
coroutine->waker->error = error;
689+
waker->error = error;
688690

689691
if (false == transfer_error) {
690692
GC_ADDREF(error);
691693
}
692694
}
693695
}
694696

695-
if (UNEXPECTED(coroutine->waker->status == ZEND_ASYNC_WAKER_QUEUED)) {
697+
if (UNEXPECTED(waker->status == ZEND_ASYNC_WAKER_QUEUED)) {
698+
return;
699+
}
700+
701+
const bool in_scheduler_context = ZEND_ASYNC_SCHEDULER_CONTEXT;
702+
703+
// **Short execution path**:
704+
// If the event handlers are running under the scheduler
705+
// And this is the current coroutine
706+
// There is no point in returning it to the queue,
707+
// we will execute it immediately!
708+
if (UNEXPECTED(in_scheduler_context && coroutine == ZEND_ASYNC_CURRENT_COROUTINE)) {
709+
waker->status = ZEND_ASYNC_WAKER_RESULT;
696710
return;
697711
}
698712

@@ -701,10 +715,10 @@ void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, con
701715
return;
702716
}
703717

704-
coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED;
718+
waker->status = ZEND_ASYNC_WAKER_QUEUED;
705719

706720
// Add to resumed_coroutines queue for event cleanup
707-
if (ZEND_ASYNC_IS_SCHEDULER_CONTEXT) {
721+
if (in_scheduler_context) {
708722
circular_buffer_push_ptr_with_resize(&ASYNC_G(resumed_coroutines), coroutine);
709723
}
710724
}

scheduler.c

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,9 @@ void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine)
10541054
static zend_always_inline void scheduler_next_tick(void)
10551055
{
10561056
zend_fiber_transfer *transfer = NULL;
1057-
ZEND_ASYNC_SCHEDULER_CONTEXT = true;
1057+
bool *in_scheduler_context = &ZEND_ASYNC_SCHEDULER_CONTEXT;
1058+
1059+
*in_scheduler_context = true;
10581060

10591061
zend_object **exception_ptr = &EG(exception);
10601062
zend_object **prev_exception_ptr = &EG(prev_exception);
@@ -1078,7 +1080,14 @@ static zend_always_inline void scheduler_next_tick(void)
10781080
TRY_HANDLE_SUSPEND_EXCEPTION();
10791081
}
10801082

1081-
ZEND_ASYNC_SCHEDULER_CONTEXT = false;
1083+
*in_scheduler_context = false;
1084+
1085+
// Fast return path without context switching...
1086+
zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE;
1087+
1088+
if (UNEXPECTED(coroutine != NULL && coroutine->waker != NULL && coroutine->waker->status == ZEND_ASYNC_WAKER_RESULT)) {
1089+
return;
1090+
}
10821091

10831092
const bool is_next_coroutine = circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue));
10841093

@@ -1135,19 +1144,38 @@ void async_scheduler_coroutine_suspend(void)
11351144
//
11361145
if (coroutine != NULL && coroutine->waker != NULL) {
11371146

1138-
const bool not_in_queue = ZEND_ASYNC_WAKER_NOT_IN_QUEUE(coroutine->waker);
1147+
zend_async_waker_t *waker = coroutine->waker;
1148+
const bool not_in_queue = ZEND_ASYNC_WAKER_NOT_IN_QUEUE(waker);
11391149

11401150
// Let's check that the coroutine has something to wait for;
11411151
// If a coroutine isn't waiting for anything, it must be in the execution queue.
11421152
// otherwise, it's a potential deadlock.
1143-
if (coroutine->waker->events.nNumOfElements == 0 && not_in_queue) {
1153+
if (waker->events.nNumOfElements == 0 && not_in_queue) {
11441154
async_throw_error("The coroutine has no events to wait for");
11451155
zend_async_waker_clean(coroutine);
11461156
zend_exception_restore_fast(exception_ptr, prev_exception_ptr);
11471157
return;
11481158
}
11491159

1150-
start_waker_events(coroutine->waker);
1160+
// Before starting the events, we change the status of the Waker.
1161+
// This is important because the coroutine may return to the execution queue immediately
1162+
// after the events are initialized.
1163+
if (not_in_queue) {
1164+
waker->status = ZEND_ASYNC_WAKER_WAITING;
1165+
}
1166+
1167+
bool *in_scheduler_context = &ZEND_ASYNC_SCHEDULER_CONTEXT;
1168+
bool prev_in_scheduler_context = *in_scheduler_context;
1169+
1170+
*in_scheduler_context = true;
1171+
start_waker_events(waker);
1172+
*in_scheduler_context = prev_in_scheduler_context;
1173+
1174+
// Fast return path without placing the coroutine in the queue.
1175+
if (UNEXPECTED(waker->status == ZEND_ASYNC_WAKER_RESULT)) {
1176+
zend_hash_clean(&waker->events);
1177+
goto resuming;
1178+
}
11511179

11521180
// If an exception occurs during the startup of the Waker object,
11531181
// that exception belongs to the current coroutine,
@@ -1160,10 +1188,6 @@ void async_scheduler_coroutine_suspend(void)
11601188
zend_exception_restore_fast(exception_ptr, prev_exception_ptr);
11611189
return;
11621190
}
1163-
1164-
if (not_in_queue) {
1165-
coroutine->waker->status = ZEND_ASYNC_WAKER_WAITING;
1166-
}
11671191
}
11681192

11691193
if (UNEXPECTED(coroutine->switch_handlers)) {
@@ -1188,6 +1212,8 @@ void async_scheduler_coroutine_suspend(void)
11881212
ZEND_COROUTINE_ENTER(coroutine);
11891213
}
11901214

1215+
resuming:
1216+
11911217
// Rethrow exception if waker has it
11921218
if (coroutine->waker->error != NULL) {
11931219
zend_object *exception = coroutine->waker->error;
@@ -1239,6 +1265,7 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer)
12391265

12401266
// Allocate VM stack on C stack instead of heap
12411267
char vm_stack_memory[ZEND_FIBER_VM_STACK_SIZE];
1268+
bool *in_scheduler_context = &ZEND_ASYNC_SCHEDULER_CONTEXT;
12421269

12431270
zend_first_try
12441271
{
@@ -1291,7 +1318,7 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer)
12911318

12921319
ZEND_ASYNC_SCHEDULER_HEARTBEAT;
12931320

1294-
ZEND_ASYNC_SCHEDULER_CONTEXT = true;
1321+
*in_scheduler_context = true;
12951322

12961323
ZEND_ASSERT(circular_buffer_is_not_empty(resumed_coroutines) == 0 && "resumed_coroutines should be 0");
12971324

@@ -1307,7 +1334,7 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer)
13071334

13081335
TRY_HANDLE_EXCEPTION();
13091336

1310-
ZEND_ASYNC_SCHEDULER_CONTEXT = false;
1337+
*in_scheduler_context = false;
13111338

13121339
if (EXPECTED(has_next_coroutine)) {
13131340
status = execute_next_coroutine_from_fiber(is_scheduler ? NULL : transfer, fiber_context);
@@ -1358,6 +1385,7 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer)
13581385
{
13591386
fiber_context->flags |= ZEND_FIBER_FLAG_BAILOUT;
13601387
transfer->flags = ZEND_FIBER_TRANSFER_FLAG_BAILOUT;
1388+
*in_scheduler_context = false;
13611389
}
13621390
zend_end_try();
13631391

0 commit comments

Comments
 (0)