Skip to content

Commit e5e9b2a

Browse files
committed
#31: + Added support for waiting on identical objects, even if they have different keys.
1 parent bbed9c9 commit e5e9b2a

File tree

2 files changed

+91
-25
lines changed

2 files changed

+91
-25
lines changed

Zend/zend_async_API.c

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -442,14 +442,46 @@ ZEND_API zend_coroutine_event_callback_t * zend_async_coroutine_callback_new(
442442
/* Waker API */
443443
//////////////////////////////////////////////////////////////////////
444444

445+
static zend_always_inline zend_async_waker_trigger_t *waker_trigger_create(zend_async_event_t *event, uint32_t initial_capacity)
446+
{
447+
size_t total_size = sizeof(zend_async_waker_trigger_t) + initial_capacity * sizeof(zend_async_event_callback_t *);
448+
zend_async_waker_trigger_t *trigger = (zend_async_waker_trigger_t *)emalloc(total_size);
449+
450+
trigger->length = 0;
451+
trigger->capacity = initial_capacity;
452+
trigger->event = event;
453+
454+
return trigger;
455+
}
456+
457+
static zend_always_inline zend_async_waker_trigger_t *waker_trigger_add_callback(zend_async_waker_trigger_t *trigger, zend_async_event_callback_t *callback)
458+
{
459+
if (trigger->length >= trigger->capacity) {
460+
uint32_t new_capacity = trigger->capacity * 2;
461+
size_t total_size = sizeof(zend_async_waker_trigger_t) + new_capacity * sizeof(zend_async_event_callback_t *);
462+
463+
zend_async_waker_trigger_t *new_trigger = (zend_async_waker_trigger_t *)erealloc(trigger, total_size);
464+
new_trigger->capacity = new_capacity;
465+
trigger = new_trigger;
466+
}
467+
468+
trigger->data[trigger->length++] = callback;
469+
return trigger;
470+
}
471+
445472
static void waker_events_dtor(zval *item)
446473
{
447474
zend_async_waker_trigger_t * trigger = Z_PTR_P(item);
448475
zend_async_event_t *event = trigger->event;
449476
trigger->event = NULL;
450477

451478
if (event != NULL) {
452-
event->del_callback(event, trigger->callback);
479+
// Remove all callbacks from the event
480+
for (uint32_t i = 0; i < trigger->length; i++) {
481+
if (trigger->data[i] != NULL) {
482+
event->del_callback(event, trigger->data[i]);
483+
}
484+
}
453485
//
454486
// At this point, we explicitly stop the event because it is no longer being listened to by our handlers.
455487
// However, this does not mean the object is destroyed—it may remain in memory if something still holds a reference to it.
@@ -458,6 +490,7 @@ static void waker_events_dtor(zval *item)
458490
ZEND_ASYNC_EVENT_RELEASE(event);
459491
}
460492

493+
// Free the entire trigger (includes flexible array member)
461494
efree(trigger);
462495
}
463496

@@ -572,11 +605,29 @@ void coroutine_event_callback_dispose(zend_async_event_callback_t *callback, zen
572605
zend_async_waker_t * waker = coroutine->waker;
573606

574607
if (event != NULL && waker != NULL) {
575-
// remove the event from the waker
576-
zend_hash_index_del(&waker->events, (zend_ulong)event);
577-
578-
if (waker->triggered_events != NULL) {
579-
zend_hash_index_del(waker->triggered_events, (zend_ulong)event);
608+
// Find the trigger for this event
609+
zval *trigger_zval = zend_hash_index_find(&waker->events, (zend_ulong)event);
610+
611+
if (trigger_zval != NULL) {
612+
zend_async_waker_trigger_t *trigger = Z_PTR_P(trigger_zval);
613+
614+
// Remove only this specific callback from the trigger
615+
for (uint32_t i = 0; i < trigger->length; i++) {
616+
if (trigger->data[i] == callback) {
617+
// Move last element to current position (O(1) removal)
618+
trigger->data[i] = trigger->data[--trigger->length];
619+
break;
620+
}
621+
}
622+
623+
// If no more callbacks in trigger, remove the entire event
624+
if (trigger->length == 0) {
625+
zend_hash_index_del(&waker->events, (zend_ulong)event);
626+
627+
if (waker->triggered_events != NULL) {
628+
zend_hash_index_del(waker->triggered_events, (zend_ulong)event);
629+
}
630+
}
580631
}
581632
}
582633
}
@@ -669,33 +720,46 @@ ZEND_API void zend_async_resume_when(
669720
}
670721

671722
if (EXPECTED(coroutine->waker != NULL)) {
672-
zend_async_waker_trigger_t *trigger = emalloc(sizeof(zend_async_waker_trigger_t));
673-
trigger->event = event;
674-
trigger->callback = &event_callback->base;
723+
zval *trigger_zval = zend_hash_index_find(&coroutine->waker->events, (zend_ulong)event);
724+
zend_async_waker_trigger_t *trigger;
725+
726+
if (UNEXPECTED(trigger_zval != NULL)) {
727+
// Event already exists, add callback to existing trigger
728+
trigger = Z_PTR_P(trigger_zval);
729+
trigger = waker_trigger_add_callback(trigger, &event_callback->base);
730+
// Update the hash table entry with potentially new pointer after realloc
731+
Z_PTR_P(trigger_zval) = trigger;
732+
} else {
733+
// New event, create new trigger
734+
trigger = waker_trigger_create(event, 1);
735+
trigger = waker_trigger_add_callback(trigger, &event_callback->base);
675736

676-
if (UNEXPECTED(zend_hash_index_add_ptr(&coroutine->waker->events, (zend_ulong)event, trigger) == NULL)) {
677-
efree(trigger);
737+
if (UNEXPECTED(zend_hash_index_add_ptr(&coroutine->waker->events, (zend_ulong)event, trigger) == NULL)) {
738+
// This should not happen with new events, but handle gracefully
739+
efree(trigger);
678740

679741
event_callback->coroutine = NULL;
680742
event->del_callback(event, &event_callback->base);
681743

682-
if (locally_allocated_callback) {
683-
event_callback->base.dispose(&event_callback->base, event);
684-
}
744+
event_callback->coroutine = NULL;
745+
event->del_callback(event, &event_callback->base);
685746

686-
if (trans_event) {
687-
event->dispose(event);
688-
}
747+
if (locally_allocated_callback) {
748+
event_callback->base.dispose(&event_callback->base, event);
749+
}
689750

690-
zend_throw_error(NULL, "Failed to add event to the waker: maybe event already exists");
751+
if (trans_event) {
752+
event->dispose(event);
753+
}
691754

692-
return;
755+
zend_throw_error(NULL, "Failed to add event to the waker");
756+
} else {
757+
if (false == trans_event) {
758+
ZEND_ASYNC_EVENT_ADD_REF(event);
759+
}
760+
}
693761
}
694762
}
695-
696-
if (false == trans_event) {
697-
ZEND_ASYNC_EVENT_ADD_REF(event);
698-
}
699763
}
700764

701765
ZEND_API void zend_async_waker_callback_resolve(

Zend/zend_async_API.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,10 @@ struct _zend_coroutine_event_callback_s {
418418
};
419419

420420
struct _zend_async_waker_trigger_s {
421-
zend_async_event_t *event;
422-
zend_async_event_callback_t *callback;
421+
uint32_t length; /* current number of callbacks */
422+
uint32_t capacity; /* allocated slots in the array */
423+
zend_async_event_t *event;
424+
zend_async_event_callback_t *data[]; /* flexible array member */
423425
};
424426

425427
/* Dynamic array of async event callbacks with single iterator protection */

0 commit comments

Comments
 (0)