@@ -51,6 +51,7 @@ namespace exec {
5151 inplace_stop_source __stop_source_{};
5252 mutable std::mutex __lock_{};
5353 mutable __std::atomic_ptrdiff_t __active_ = 0 ;
54+ mutable __std::atomic_ptrdiff_t __pending_notifiers_ = 0 ; // Track in-flight __complete() calls
5455 mutable __intrusive_queue<&__task::__next_> __waiters_{};
5556
5657 ~__impl () {
@@ -83,8 +84,13 @@ namespace exec {
8384 // the waiter is queued but after __active is checked, the waiter will never be notified
8485 std::unique_lock __guard{this ->__scope_ ->__lock_ };
8586 auto & __active = this ->__scope_ ->__active_ ;
87+ auto & __pending = this ->__scope_ ->__pending_notifiers_ ;
8688 auto & __waiters = this ->__scope_ ->__waiters_ ;
87- if (__active.load (__std::memory_order_acquire) != 0 ) {
89+ // Also check __pending_notifiers_ to avoid race with in-flight __complete() calls.
90+ // A __complete() that did fetch_sub but hasn't locked the mutex yet will have
91+ // incremented __pending_notifiers_, preventing us from completing immediately.
92+ if (__active.load (__std::memory_order_acquire) != 0
93+ || __pending.load (__std::memory_order_acquire) != 0 ) {
8894 __waiters.push_back (this );
8995 return ;
9096 }
@@ -158,9 +164,13 @@ namespace exec {
158164 __nest_op_base<_ReceiverId>* __op_;
159165
160166 static void __complete (const __impl* __scope) noexcept {
167+ // Increment pending BEFORE fetch_sub to close race window with on_empty().
168+ // This ensures on_empty() sees pending > 0 if we're about to lock the mutex.
169+ __scope->__pending_notifiers_ .fetch_add (1 , __std::memory_order_acquire);
161170 auto & __active = __scope->__active_ ;
162- std::unique_lock __guard{__scope->__lock_ };
163171 if (__active.fetch_sub (1 , __std::memory_order_acq_rel) == 1 ) {
172+ std::unique_lock __guard{__scope->__lock_ };
173+ __scope->__pending_notifiers_ .fetch_sub (1 , __std::memory_order_release);
164174 auto __local_waiters = std::move (__scope->__waiters_ );
165175 __guard.unlock ();
166176 __scope = nullptr ;
@@ -170,6 +180,8 @@ namespace exec {
170180 __next->__notify_waiter (__next);
171181 // __scope must be considered deleted
172182 }
183+ } else {
184+ __scope->__pending_notifiers_ .fetch_sub (1 , __std::memory_order_release);
173185 }
174186 }
175187
0 commit comments