@@ -48,6 +48,18 @@ static void shutdown_stub(void) {}
48
48
49
49
static zend_array * get_coroutines_stub (void ) { return NULL ; }
50
50
51
+ static zend_future_t * future_create_stub (bool thread_safe , size_t extra_size )
52
+ {
53
+ ASYNC_THROW_ERROR ("Async API is not enabled" );
54
+ return NULL ;
55
+ }
56
+
57
+ static zend_async_channel_t * channel_create_stub (size_t buffer_size , bool resizable , bool thread_safe , size_t extra_size )
58
+ {
59
+ ASYNC_THROW_ERROR ("Async API is not enabled" );
60
+ return NULL ;
61
+ }
62
+
51
63
static void add_microtask_stub (zend_async_microtask_t * microtask ) {}
52
64
53
65
static zend_array * get_awaiting_info_stub (zend_coroutine_t * coroutine ) { return NULL ; }
@@ -97,6 +109,8 @@ zend_async_get_coroutines_t zend_async_get_coroutines_fn = get_coroutines_stub;
97
109
zend_async_add_microtask_t zend_async_add_microtask_fn = add_microtask_stub ;
98
110
zend_async_get_awaiting_info_t zend_async_get_awaiting_info_fn = get_awaiting_info_stub ;
99
111
zend_async_get_class_ce_t zend_async_get_class_ce_fn = get_class_ce ;
112
+ zend_async_future_create_t zend_async_future_create_fn = future_create_stub ;
113
+ zend_async_channel_create_t zend_async_channel_create_fn = channel_create_stub ;
100
114
101
115
static zend_atomic_bool reactor_lock = {0 };
102
116
static char * reactor_module_name = NULL ;
@@ -428,14 +442,46 @@ ZEND_API zend_coroutine_event_callback_t * zend_async_coroutine_callback_new(
428
442
/* Waker API */
429
443
//////////////////////////////////////////////////////////////////////
430
444
445
+ static zend_always_inline zend_async_waker_trigger_t * waker_trigger_create (zend_async_event_t * event , uint32_t initial_capacity )
446
+ {
447
+ size_t total_size = sizeof (zend_async_waker_trigger_t ) + (initial_capacity - 1 ) * sizeof (zend_async_event_callback_t * );
448
+ zend_async_waker_trigger_t * trigger = (zend_async_waker_trigger_t * )emalloc (total_size );
449
+
450
+ trigger -> length = 0 ;
451
+ trigger -> capacity = initial_capacity ;
452
+ trigger -> event = event ;
453
+
454
+ return trigger ;
455
+ }
456
+
457
+ static zend_always_inline zend_async_waker_trigger_t * waker_trigger_add_callback (zend_async_waker_trigger_t * trigger , zend_async_event_callback_t * callback )
458
+ {
459
+ if (trigger -> length >= trigger -> capacity ) {
460
+ uint32_t new_capacity = trigger -> capacity * 2 ;
461
+ size_t total_size = sizeof (zend_async_waker_trigger_t ) + (new_capacity - 1 ) * sizeof (zend_async_event_callback_t * );
462
+
463
+ zend_async_waker_trigger_t * new_trigger = (zend_async_waker_trigger_t * )erealloc (trigger , total_size );
464
+ new_trigger -> capacity = new_capacity ;
465
+ trigger = new_trigger ;
466
+ }
467
+
468
+ trigger -> data [trigger -> length ++ ] = callback ;
469
+ return trigger ;
470
+ }
471
+
431
472
static void waker_events_dtor (zval * item )
432
473
{
433
474
zend_async_waker_trigger_t * trigger = Z_PTR_P (item );
434
475
zend_async_event_t * event = trigger -> event ;
435
476
trigger -> event = NULL ;
436
477
437
478
if (event != NULL ) {
438
- event -> del_callback (event , trigger -> callback );
479
+ // Remove all callbacks from the event
480
+ for (uint32_t i = 0 ; i < trigger -> length ; i ++ ) {
481
+ if (trigger -> data [i ] != NULL ) {
482
+ event -> del_callback (event , trigger -> data [i ]);
483
+ }
484
+ }
439
485
//
440
486
// At this point, we explicitly stop the event because it is no longer being listened to by our handlers.
441
487
// However, this does not mean the object is destroyed—it may remain in memory if something still holds a reference to it.
@@ -444,6 +490,7 @@ static void waker_events_dtor(zval *item)
444
490
ZEND_ASYNC_EVENT_RELEASE (event );
445
491
}
446
492
493
+ // Free the entire trigger (includes flexible array member)
447
494
efree (trigger );
448
495
}
449
496
@@ -558,11 +605,29 @@ void coroutine_event_callback_dispose(zend_async_event_callback_t *callback, zen
558
605
zend_async_waker_t * waker = coroutine -> waker ;
559
606
560
607
if (event != NULL && waker != NULL ) {
561
- // remove the event from the waker
562
- zend_hash_index_del (& waker -> events , (zend_ulong )event );
563
-
564
- if (waker -> triggered_events != NULL ) {
565
- zend_hash_index_del (waker -> triggered_events , (zend_ulong )event );
608
+ // Find the trigger for this event
609
+ zval * trigger_zval = zend_hash_index_find (& waker -> events , (zend_ulong )event );
610
+
611
+ if (trigger_zval != NULL ) {
612
+ zend_async_waker_trigger_t * trigger = Z_PTR_P (trigger_zval );
613
+
614
+ // Remove only this specific callback from the trigger
615
+ for (uint32_t i = 0 ; i < trigger -> length ; i ++ ) {
616
+ if (trigger -> data [i ] == callback ) {
617
+ // Move last element to current position (O(1) removal)
618
+ trigger -> data [i ] = trigger -> data [-- trigger -> length ];
619
+ break ;
620
+ }
621
+ }
622
+
623
+ // If no more callbacks in trigger, remove the entire event
624
+ if (trigger -> length == 0 ) {
625
+ zend_hash_index_del (& waker -> events , (zend_ulong )event );
626
+
627
+ if (waker -> triggered_events != NULL ) {
628
+ zend_hash_index_del (waker -> triggered_events , (zend_ulong )event );
629
+ }
630
+ }
566
631
}
567
632
}
568
633
}
@@ -603,6 +668,12 @@ ZEND_API void zend_async_resume_when(
603
668
zend_coroutine_event_callback_t * event_callback
604
669
)
605
670
{
671
+ ZEND_ASSERT (EG (exception ) == NULL && "Cannot resume when there is an active exception in the engine." );
672
+
673
+ if (UNEXPECTED (EG (exception ))) {
674
+ return ;
675
+ }
676
+
606
677
bool locally_allocated_callback = false;
607
678
608
679
if (UNEXPECTED (ZEND_ASYNC_EVENT_IS_CLOSED (event ))) {
@@ -655,29 +726,45 @@ ZEND_API void zend_async_resume_when(
655
726
}
656
727
657
728
if (EXPECTED (coroutine -> waker != NULL )) {
658
- zend_async_waker_trigger_t * trigger = emalloc (sizeof (zend_async_waker_trigger_t ));
659
- trigger -> event = event ;
660
- trigger -> callback = & event_callback -> base ;
729
+ zval * trigger_zval = zend_hash_index_find (& coroutine -> waker -> events , (zend_ulong )event );
730
+ zend_async_waker_trigger_t * trigger ;
731
+
732
+ if (UNEXPECTED (trigger_zval != NULL )) {
733
+ // Event already exists, add callback to existing trigger
734
+ trigger = Z_PTR_P (trigger_zval );
735
+ trigger = waker_trigger_add_callback (trigger , & event_callback -> base );
736
+ // Update the hash table entry with potentially new pointer after realloc
737
+ Z_PTR_P (trigger_zval ) = trigger ;
738
+ } else {
739
+ // New event, create new trigger
740
+ trigger = waker_trigger_create (event , 1 );
741
+ trigger = waker_trigger_add_callback (trigger , & event_callback -> base );
661
742
662
- if (UNEXPECTED (zend_hash_index_add_ptr (& coroutine -> waker -> events , (zend_ulong )event , trigger ) == NULL )) {
663
- efree (trigger );
743
+ if (UNEXPECTED (zend_hash_index_add_ptr (& coroutine -> waker -> events , (zend_ulong )event , trigger ) == NULL )) {
744
+ // This should not happen with new events, but handle gracefully
745
+ efree (trigger );
664
746
665
- if (locally_allocated_callback ) {
666
- event_callback -> base .dispose (& event_callback -> base , event );
667
- }
747
+ event_callback -> coroutine = NULL ;
748
+ event -> del_callback (event , & event_callback -> base );
668
749
669
- if (trans_event ) {
670
- event -> dispose (event );
671
- }
750
+ event_callback -> coroutine = NULL ;
751
+ event -> del_callback (event , & event_callback -> base );
672
752
673
- zend_throw_error (NULL , "Failed to add event to the waker: maybe event already exists" );
753
+ if (locally_allocated_callback ) {
754
+ event_callback -> base .dispose (& event_callback -> base , event );
755
+ }
674
756
675
- return ;
676
- }
677
- }
757
+ if ( trans_event ) {
758
+ event -> dispose ( event );
759
+ }
678
760
679
- if (false == trans_event ) {
680
- ZEND_ASYNC_EVENT_ADD_REF (event );
761
+ zend_throw_error (NULL , "Failed to add event to the waker" );
762
+ } else {
763
+ if (false == trans_event ) {
764
+ ZEND_ASYNC_EVENT_ADD_REF (event );
765
+ }
766
+ }
767
+ }
681
768
}
682
769
}
683
770
0 commit comments