Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/stdexec/__detail/__atomic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace stdexec::__std {
using std::atomic_thread_fence;
using std::atomic_signal_fence;

# if __cpp_lib_atomic_ref >= 2018'06L
#if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY)
# if __cpp_lib_atomic_ref >= 2018'06L && !defined(STDEXEC_RELACY)

using std::atomic_ref;
# else
inline constexpr int __atomic_flag_map[] = {
Expand Down
35 changes: 31 additions & 4 deletions include/stdexec/__detail/__run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "__schedulers.hpp"

#include "__atomic.hpp"
#include "stdexec/__detail/__config.hpp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include "stdexec/__detail/__config.hpp"
#include "__config.hpp"

#include <cstddef>

namespace stdexec {
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -34,24 +36,40 @@ namespace stdexec {
public:
__run_loop_base() = default;

~__run_loop_base() noexcept {
STDEXEC_ASSERT(__task_count_.load(__std::memory_order_acquire) == 0);
}

STDEXEC_ATTRIBUTE(host, device) void run() noexcept {
// execute work items until the __finishing_ flag is set:
while (!__finishing_.load(__std::memory_order_acquire)) {
__queue_.wait_for_item();
__execute_all();
}
// drain the queue, taking care to execute any tasks that get added while
// executing the remaining tasks:
while (__execute_all())
// executing the remaining tasks (also wait for other tasks that might still be in flight):
while (__execute_all() || __task_count_.load(__std::memory_order_acquire) > 0)
;
}

STDEXEC_ATTRIBUTE(host, device) void finish() noexcept {
// Increment our task count to avoid lifetime issues. This is preventing
// a use-after-free issue if finish is called from a different thread.
// We increment the task counter by two to avoid the run loop to exit before
// we scheduled the noop task
Comment on lines +58 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We increment the task counter by two to avoid the run loop to exit before
// we scheduled the noop task
// We increment the task counter by two to prevent the run loop from
// exiting before we schedule the noop task.

__task_count_.fetch_add(2, __std::memory_order_release);
if (!__finishing_.exchange(true, __std::memory_order_acq_rel)) {
// push an empty work item to the queue to wake up the consuming thread
// and let it finish:
// and let it finish.
// The count will be decremented once the tasks executes.
__queue_.push(&__noop_task);
// If the task got pushed, simply subtract one again, the other increment
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean "the other decrement"?

// happens when the noop task got executed.
__task_count_.fetch_sub(1, __std::memory_order_release);
return;
}
// We are done finishing. Decrement the count by two, which signals final completion.
__task_count_.fetch_sub(2, __std::memory_order_release);
}

struct __task : __immovable {
Expand All @@ -73,6 +91,7 @@ namespace stdexec {

template <class _Rcvr>
struct __opstate_t : __task {
__std::atomic<std::size_t>* __task_count_;
__atomic_intrusive_queue<&__task::__next_>* __queue_;
_Rcvr __rcvr_;

Expand All @@ -89,14 +108,17 @@ namespace stdexec {

STDEXEC_ATTRIBUTE(host, device)
constexpr explicit __opstate_t(
__std::atomic<std::size_t>* __task_count,
__atomic_intrusive_queue<&__task::__next_>* __queue,
_Rcvr __rcvr)
: __task{&__execute_impl}
, __task_count_(__task_count)
, __queue_{__queue}
, __rcvr_{static_cast<_Rcvr&&>(__rcvr)} {
}

STDEXEC_ATTRIBUTE(host, device) constexpr void start() noexcept {
__task_count_->fetch_add(1, __std::memory_order_release);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
__task_count_->fetch_add(1, __std::memory_order_release);
__task_count_->fetch_add(1, __std::memory_order_release);

__queue_->push(this);
}
};
Expand All @@ -112,20 +134,25 @@ namespace stdexec {
return false; // No tasks to execute.
}

std::size_t __task_count = 0;

do {
// Take care to increment the iterator before executing the task,
// because __execute() may invalidate the current node.
auto __prev = __it++;
(*__prev)->__execute();
++__task_count;
} while (__it != __queue.end());

__queue.clear();
__task_count_.fetch_sub(__task_count, __std::memory_order_release);
return true;
}

STDEXEC_ATTRIBUTE(host, device) static void __noop_(__task*) noexcept {
}

__std::atomic<std::size_t> __task_count_{0};
__std::atomic<bool> __finishing_{false};
__atomic_intrusive_queue<&__task::__next_> __queue_{};
__task __noop_task{&__noop_};
Expand Down Expand Up @@ -186,7 +213,7 @@ namespace stdexec {
template <class _Rcvr>
STDEXEC_ATTRIBUTE(nodiscard, host, device)
constexpr auto connect(_Rcvr __rcvr) const noexcept -> __opstate_t<_Rcvr> {
return __opstate_t<_Rcvr>{&__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)};
return __opstate_t<_Rcvr>{&__loop_->__task_count_, &__loop_->__queue_, static_cast<_Rcvr&&>(__rcvr)};
}

STDEXEC_ATTRIBUTE(nodiscard, host, device)
Expand Down
4 changes: 2 additions & 2 deletions test/rrd/Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# User-customizable variables:
CXX ?= c++
CXX_STD ?= c++20
CXXFLAGS ?= -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g
CXXFLAGS ?= -DSTDEXEC_RELACY -I relacy -I relacy/relacy/fakestd -O1 -std=$(CXX_STD) -I ../../include -I ../../test -g
DEPFLAGS ?= -MD -MF $(@).d -MP -MT $(@)
build_dir = build

.SECONDARY:

test_programs = split async_scope
test_programs = split async_scope sync_wait

test_exe_files = $(foreach name,$(test_programs),$(build_dir)/$(name))

Expand Down
27 changes: 27 additions & 0 deletions test/rrd/sync_wait.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "../../relacy/relacy_std.hpp"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs the copyright header


#include <stdexec/execution.hpp>
#include <exec/static_thread_pool.hpp>

namespace ex = stdexec;

struct sync_wait_bg_thread : rl::test_suite<sync_wait_bg_thread, 1> {
static size_t const dynamic_thread_count = 1;

void thread(unsigned) {
exec::static_thread_pool pool{1};
auto sender = ex::schedule(pool.get_scheduler()) | ex::then([] { return 42; });

auto [val] = ex::sync_wait(sender).value();
RL_ASSERT(val == 42);
}
};

auto main() -> int {
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
p.search_type = rl::random_scheduler_type;
rl::simulate<sync_wait_bg_thread>(p);
return 0;
}
Loading