diff --git a/include/stdexec/__detail/__atomic.hpp b/include/stdexec/__detail/__atomic.hpp index d1c6dc2b8..2876a6f8f 100644 --- a/include/stdexec/__detail/__atomic.hpp +++ b/include/stdexec/__detail/__atomic.hpp @@ -58,7 +58,7 @@ namespace stdexec::__std { using std::atomic_thread_fence; using std::atomic_signal_fence; -# if __cpp_lib_atomic_ref >= 2018'06L +# if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY) using std::atomic_ref; # else inline constexpr int __atomic_flag_map[] = { diff --git a/include/stdexec/__detail/__run_loop.hpp b/include/stdexec/__detail/__run_loop.hpp index 684f105b9..0502fbc20 100644 --- a/include/stdexec/__detail/__run_loop.hpp +++ b/include/stdexec/__detail/__run_loop.hpp @@ -26,6 +26,8 @@ #include "__schedulers.hpp" #include "__atomic.hpp" +#include "__config.hpp" +#include namespace stdexec { ///////////////////////////////////////////////////////////////////////////// @@ -34,6 +36,10 @@ namespace stdexec { public: __run_loop_base() = default; + ~__run_loop_base() noexcept { + STDEXEC_ASSERT(__task_count_.load(__std::memory_order_acquire) == 0); + } + STDEXEC_ATTRIBUTE(host, device) void run() noexcept { // execute work items until the __finishing_ flag is set: while (!__finishing_.load(__std::memory_order_acquire)) { @@ -41,17 +47,29 @@ namespace stdexec { __execute_all(); } // drain the queue, taking care to execute any tasks that get added while - // executing the remaining tasks: - while (__execute_all()) + // executing the remaining tasks (also wait for other tasks that might still be in flight): + while (__execute_all() || __task_count_.load(__std::memory_order_acquire) > 0) ; } STDEXEC_ATTRIBUTE(host, device) void finish() noexcept { + // Increment our task count to avoid lifetime issues. This is preventing + // a use-after-free issue if finish is called from a different thread. + // We increment the task counter by two to prevent the run loop from + // exiting before we schedule the noop task. + __task_count_.fetch_add(2, __std::memory_order_release); if (!__finishing_.exchange(true, __std::memory_order_acq_rel)) { // push an empty work item to the queue to wake up the consuming thread - // and let it finish: + // and let it finish. + // The count will be decremented once the tasks executes. __queue_.push(&__noop_task); + // If the task got pushed, simply subtract one again, the other decrement + // happens when the noop task got executed. + __task_count_.fetch_sub(1, __std::memory_order_release); + return; } + // We are done finishing. Decrement the count by two, which signals final completion. + __task_count_.fetch_sub(2, __std::memory_order_release); } struct __task : __immovable { @@ -73,6 +91,7 @@ namespace stdexec { template struct __opstate_t : __task { + __std::atomic* __task_count_; __atomic_intrusive_queue<&__task::__next_>* __queue_; _Rcvr __rcvr_; @@ -89,14 +108,17 @@ namespace stdexec { STDEXEC_ATTRIBUTE(host, device) constexpr explicit __opstate_t( + __std::atomic* __task_count, __atomic_intrusive_queue<&__task::__next_>* __queue, _Rcvr __rcvr) : __task{&__execute_impl} + , __task_count_(__task_count) , __queue_{__queue} , __rcvr_{static_cast<_Rcvr&&>(__rcvr)} { } STDEXEC_ATTRIBUTE(host, device) constexpr void start() noexcept { + __task_count_->fetch_add(1, __std::memory_order_release); __queue_->push(this); } }; @@ -112,20 +134,25 @@ namespace stdexec { return false; // No tasks to execute. } + std::size_t __task_count = 0; + do { // Take care to increment the iterator before executing the task, // because __execute() may invalidate the current node. auto __prev = __it++; (*__prev)->__execute(); + ++__task_count; } while (__it != __queue.end()); __queue.clear(); + __task_count_.fetch_sub(__task_count, __std::memory_order_release); return true; } STDEXEC_ATTRIBUTE(host, device) static void __noop_(__task*) noexcept { } + __std::atomic __task_count_{0}; __std::atomic __finishing_{false}; __atomic_intrusive_queue<&__task::__next_> __queue_{}; __task __noop_task{&__noop_}; @@ -186,7 +213,7 @@ namespace stdexec { template STDEXEC_ATTRIBUTE(nodiscard, host, device) constexpr auto connect(_Rcvr __rcvr) const noexcept -> __opstate_t<_Rcvr> { - return __opstate_t<_Rcvr>{&__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)}; + return __opstate_t<_Rcvr>{&__loop_->__task_count_, &__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)}; } STDEXEC_ATTRIBUTE(nodiscard, host, device) diff --git a/test/rrd/Makefile b/test/rrd/Makefile index 4bc190633..d68321426 100644 --- a/test/rrd/Makefile +++ b/test/rrd/Makefile @@ -1,13 +1,13 @@ # User-customizable variables: CXX ?= c++ CXX_STD ?= c++20 -CXXFLAGS ?= -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g +CXXFLAGS ?= -DSTDEXEC_RELACY -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g DEPFLAGS ?= -MD -MF $(@).d -MP -MT $(@) build_dir = build .SECONDARY: -test_programs = split async_scope +test_programs = split async_scope sync_wait test_exe_files = $(foreach name,$(test_programs),$(build_dir)/$(name)) diff --git a/test/rrd/sync_wait.cpp b/test/rrd/sync_wait.cpp new file mode 100644 index 000000000..399a2fe5a --- /dev/null +++ b/test/rrd/sync_wait.cpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 NVIDIA Corporation + * Copyright (c) 2025 Chris Cotter + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../../relacy/relacy_std.hpp" + +#include +#include + +namespace ex = stdexec; + +struct sync_wait_bg_thread : rl::test_suite { + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) { + exec::static_thread_pool pool{1}; + auto sender = ex::schedule(pool.get_scheduler()) | ex::then([] { return 42; }); + + auto [val] = ex::sync_wait(sender).value(); + RL_ASSERT(val == 42); + } +}; + +auto main() -> int { + rl::test_params p; + p.iteration_count = 50000; + p.execution_depth_limit = 10000; + p.search_type = rl::random_scheduler_type; + rl::simulate(p); + return 0; +}