diff --git a/include/beman/execution26/detail/atomic_intrusive_stack.hpp b/include/beman/execution26/detail/atomic_intrusive_stack.hpp new file mode 100644 index 00000000..e579c4dc --- /dev/null +++ b/include/beman/execution26/detail/atomic_intrusive_stack.hpp @@ -0,0 +1,88 @@ +// include/beman/execution26/detail/atomic_intrusive_stack.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_ATOMIC_INTRUSIVE_STACK +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_ATOMIC_INTRUSIVE_STACK + +#include + +#include +#include +#include + +namespace beman::execution26::detail { + +template +class atomic_intrusive_stack; + +//! @brief This data structure is an intrusive stack that can be used in a lock-free manner. +//! +//! The stack is implemented as a singly linked list where the head is an atomic pointer to the first item in +//! the stack. +//! try_push() is a lock-free operation that tries to push an item to the stack. If the stack is empty, it +//! returns nullptr and the item is pushed to the stack. +//! This stack has a closed state, which is indicated by the head pointing to the stack itself. In this state, +//! try_push() returns std::nullopt and the stack is not modified. +//! pop_all_and_shutdown() is a lock-free operation that pops all items from the stack and returns them in a +//! queue. If the stack is empty, it returns an empty queue. +//! +//! We use this stack in the split implementation to store the listeners that are waiting for the operation to +//! complete. +//! +//! @tparam Item The type of the item in the stack. +//! @tparam Next The pointer to the next item in the stack. +template +class atomic_intrusive_stack { + public: + atomic_intrusive_stack() = default; + ~atomic_intrusive_stack() { assert(!head_ || head_ == this); } + atomic_intrusive_stack(const atomic_intrusive_stack&) = delete; + auto operator=(const atomic_intrusive_stack&) -> atomic_intrusive_stack& = delete; + atomic_intrusive_stack(atomic_intrusive_stack&&) noexcept = delete; + auto operator=(atomic_intrusive_stack&&) noexcept -> atomic_intrusive_stack& = delete; + + //! @brief Tries to push an item to the stack. + //! + //! @param item The item to push to the stack. + //! + //! @return If the stack is empty, returns nullptr and pushes the item to the stack. + //! If the stack is in the closed state, returns std::nullopt. + auto try_push(Item* item) noexcept -> std::optional { + void* ptr = head_.load(); + if (ptr == this) { + return std::nullopt; + } + item->*Next = static_cast(ptr); + while (!head_.compare_exchange_weak(ptr, item)) { + if (ptr == this) { + return std::nullopt; + } + item->*Next = static_cast(ptr); + } + return static_cast(ptr); + } + + //! @brief Tests if the stack is empty and not in the closed state. + auto empty_and_not_shutdown() const noexcept -> bool { return head_.load() == nullptr; } + + //! @brief Pops all items from the stack, returns them and puts this stack into the closed state. + //! + //! @return If the stack is empty, returns an empty stack. + auto pop_all_and_shutdown() noexcept -> ::beman::execution26::detail::intrusive_stack { + auto stack = ::beman::execution26::detail::intrusive_stack{}; + void* ptr = head_.exchange(this); + if (ptr == this) { + return stack; + } + auto item = static_cast(ptr); + stack.head_ = item; + return stack; + } + + private: + ::std::atomic head_{nullptr}; +}; + +} // namespace beman::execution26::detail + +#endif \ No newline at end of file diff --git a/include/beman/execution26/detail/intrusive_stack.hpp b/include/beman/execution26/detail/intrusive_stack.hpp new file mode 100644 index 00000000..4ee89ffb --- /dev/null +++ b/include/beman/execution26/detail/intrusive_stack.hpp @@ -0,0 +1,47 @@ +// include/beman/execution26/detail/intrusive_queue.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_INTRUSIVE_QUEUE + +#include +#include + +namespace beman::execution26::detail { + +template +class atomic_intrusive_stack; + +template +class intrusive_stack; + +//! @brief This data structure is an intrusive queue that is not thread-safe. +template +class intrusive_stack { + public: + //! @brief Pushes an item to the queue. + auto push(Item* item) noexcept -> void { item->*Next = std::exchange(head_, item); } + + //! @brief Pops one item from the queue. + //! + //! @return The item that was popped from the queue, or nullptr if the queue is empty. + auto pop() noexcept -> Item* { + if (head_) { + auto item = head_; + head_ = std::exchange(item->*Next, nullptr); + return item; + } + return nullptr; + } + + //! @brief Tests if the queue is empty. + auto empty() const noexcept -> bool { return !head_; } + + private: + friend class atomic_intrusive_stack; + Item* head_{nullptr}; +}; + +} // namespace beman::execution26::detail + +#endif \ No newline at end of file diff --git a/include/beman/execution26/detail/split.hpp b/include/beman/execution26/detail/split.hpp new file mode 100644 index 00000000..9bf2634d --- /dev/null +++ b/include/beman/execution26/detail/split.hpp @@ -0,0 +1,383 @@ +// include/beman/execution26/detail/split.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_SPLIT +#define INCLUDED_BEMAN_EXECUTION26_DETAIL_SPLIT + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { + +struct split_impl_t {}; +template <> +struct impls_for : ::beman::execution26::detail::default_impls { + + template + struct shared_state; + + struct split_env { + ::beman::execution26::inplace_stop_source* stop_source; + + ::beman::execution26::inplace_stop_token query(::beman::execution26::get_stop_token_t) const noexcept { + return stop_source->get_token(); + } + }; + + struct local_state_base { + local_state_base* next{nullptr}; + virtual auto notify() noexcept -> void = 0; + + local_state_base() noexcept = default; + local_state_base(const local_state_base&) = delete; + local_state_base(local_state_base&&) = delete; + auto operator=(const local_state_base&) -> local_state_base& = delete; + auto operator=(local_state_base&&) -> local_state_base& = delete; + + protected: + ~local_state_base() = default; + }; + + template + struct split_receiver { + using receiver_concept = ::beman::execution26::receiver_t; + + explicit split_receiver(shared_state* state) noexcept : sh_state(state) { + if (sh_state) { + sh_state->inc_ref(); + } + } + + ~split_receiver() noexcept { + if (sh_state) { + sh_state->dec_ref(); + } + } + + split_receiver(split_receiver&& other) noexcept : sh_state(::std::exchange(other.sh_state, nullptr)) {} + split_receiver& operator=(split_receiver&& other) noexcept { + sh_state = ::std::exchange(other.sh_state, nullptr); + return *this; + } + + split_receiver(const split_receiver&) = delete; + split_receiver& operator=(const split_receiver&) = delete; + + template + void complete(Tag, Args&&... args) noexcept { + using tuple_t = ::beman::execution26::detail::decayed_tuple; + try { + sh_state->result.template emplace(Tag(), ::std::forward(args)...); + } catch (...) { + using tuple_err = ::std::tuple<::beman::execution26::set_error_t, ::std::exception_ptr>; + sh_state->result.template emplace(::beman::execution26::set_error, + ::std::current_exception()); + } + sh_state->notify(); + } + + template + void set_value(Args&&... args) && noexcept { + complete(::beman::execution26::set_value, ::std::forward(args)...); + } + + template + void set_error(Error&& err) && noexcept { + complete(::beman::execution26::set_error, ::std::forward(err)); + } + + void set_stopped() && noexcept { complete(::beman::execution26::set_stopped); } + + split_env get_env() const noexcept { return split_env{&sh_state->stop_src}; } + + shared_state* sh_state; + }; + + // [exec.split-10] + template + struct shared_state { + template + using value_tuple = ::std::tuple<::beman::execution26::set_value_t, ::std::decay_t...>; + + template + using error_tuples = ::std::variant<::std::tuple<::beman::execution26::set_error_t, ::std::decay_t>...>; + + using variant_type = ::beman::execution26::detail::meta::unique<::beman::execution26::detail::meta::combine< + ::std::variant<::std::monostate>, + ::std::variant<::std::tuple<::beman::execution26::set_stopped_t>>, + ::std::variant<::std::tuple<::beman::execution26::set_error_t, ::std::exception_ptr>>, + ::beman::execution26::error_types_of_t, + ::beman::execution26::value_types_of_t>>; + + using state_list_type = ::beman::execution26::detail::atomic_intrusive_stack<&local_state_base::next>; + + using child_operation_state = ::beman::execution26::connect_result_t>; + + explicit shared_state(Sndr&& sndr) { + try { + op_state.emplace(::beman::execution26::detail::emplace_from{[&] { + return ::beman::execution26::connect(::std::forward(sndr), split_receiver{this}); + }}); + } catch (...) { + using error_tuple_t = ::std::tuple<::beman::execution26::set_error_t, ::std::exception_ptr>; + result.template emplace(::beman::execution26::set_error, ::std::current_exception()); + [[maybe_unused]] auto queue = waiting_states.pop_all_and_shutdown(); + assert(queue.empty()); + } + } + + // We use an intrusive list to store the listeners that are waiting for the operation to complete. + // if the intrusive list is empty, we start the operation + // if the intrusive list is not empty, we push the listener to the intrusive list + // if the intrusive list is shutdown, we immediately notify the listener + void add_listener(local_state_base* listener) noexcept { + // try to push the listener to the intrusive list, if the intrusive list is empty and not shutdown, start + // the operation + if (auto maybe_ptr = waiting_states.try_push(listener); !maybe_ptr) { + // the queue is shutdown, immediately notify the listener + listener->notify(); + } else if (!(*maybe_ptr)) { + // the operation was not started yet, we are first, and we start it + assert(op_state); + ::beman::execution26::start(*op_state); + } + } + + void notify() noexcept { + // note: this is different from stdexec. + // we discussed lifetime durations of operation at LEWG and haven't decided yet + // whether we should keep the operation alive as long as possible + op_state.reset(); + auto listeners = waiting_states.pop_all_and_shutdown(); + while (auto listener = listeners.pop()) { + listener->notify(); + } + } + + void inc_ref() noexcept { ref_count.fetch_add(1); } + + // This is the most complicated part of the split implementation. + // On construction, the operation state increments the ref count. + // Before the operation is started, at least one listener is added to the queue. + // If the ref count is decreased to one and the there are no listeners in the queue + // the operation state is the last object holding the shared state and we can safely + // destroy it + // + // it is not thread safe to destroy a split-sender and copy it at the same time + // this is similar to how a shared_ptr is not thread safe to copy and destroy at the same time + void dec_ref() noexcept { + std::size_t count = ref_count.load(); + if (count == 2 && waiting_states.empty_and_not_shutdown()) { + assert(op_state); + [[maybe_unused]] auto listeners = waiting_states.pop_all_and_shutdown(); + assert(listeners.empty()); + op_state.reset(); + } + if (ref_count.fetch_sub(1) == 1) { + delete this; + } + } + + ::beman::execution26::inplace_stop_source stop_src{}; + variant_type result{}; + state_list_type waiting_states{}; + ::std::atomic<::std::size_t> ref_count{0}; + ::std::optional op_state{}; + }; + + template + struct local_state final : local_state_base { + using stop_token_type = ::beman::execution26::stop_token_of_t<::beman::execution26::env_of_t>; + + struct on_stop_type { + shared_state* sh_state; + void operator()() noexcept { sh_state->stop_src.request_stop(); } + }; + + using on_stop_callback = ::beman::execution26::stop_callback_for_t; + + explicit local_state(shared_state* state, Receiver& rcvr) noexcept + : sh_state(state), receiver{std::addressof(rcvr)} { + sh_state->inc_ref(); + } + + ~local_state() noexcept { sh_state->dec_ref(); } + + local_state(const local_state&) = delete; + local_state& operator=(const local_state&) = delete; + local_state(local_state&&) = delete; + local_state& operator=(local_state&&) = delete; + + auto notify() noexcept -> void override { + on_stop.reset(); + auto stop_token = ::beman::execution26::get_stop_token(::beman::execution26::get_env(receiver)); + if (stop_token.stop_requested()) { + ::beman::execution26::set_stopped(std::move(*receiver)); + } else { + assert(sh_state->result.index() > 0); + assert(!sh_state->result.valueless_by_exception()); + try { + ::std::visit( + [&](const Arg& arg) noexcept -> void { + if constexpr (not ::std::same_as<::std::decay_t, ::std::monostate>) { + ::std::apply( + [&](auto tag, const auto&... args) noexcept -> void { + tag(::std::move(*receiver), args...); + }, + arg); + } + }, + sh_state->result); + } catch (...) { + // required by clang-tidy although it is not necessary here + // see valueless_by_exception() check above + std::terminate(); + } + } + } + + void start() noexcept { + on_stop.emplace(::beman::execution26::get_stop_token(::beman::execution26::get_env(receiver)), + on_stop_type{sh_state}); + sh_state->add_listener(this); + } + + std::optional on_stop; + shared_state* sh_state; + Receiver* receiver; + }; + + static constexpr auto get_state = [](Sender&& sender, + Receiver& receiver) noexcept { + auto&& wrapper = sender.template get<1>(); + return local_state(wrapper.sh_state, receiver); + }; + + static constexpr auto start = [](local_state& state, Rcvr&) noexcept { + state.start(); + }; +}; + +template +struct shared_wrapper { + explicit shared_wrapper(impls_for::shared_state* state) noexcept : sh_state(state) { + if (sh_state) { + sh_state->inc_ref(); + } + } + + ~shared_wrapper() noexcept { + if (sh_state) { + sh_state->dec_ref(); + } + } + + shared_wrapper(const shared_wrapper& other) noexcept : sh_state(other.sh_state) { + if (sh_state) { + sh_state->inc_ref(); + } + } + + shared_wrapper(shared_wrapper&& other) noexcept : sh_state(::std::exchange(other.sh_state, nullptr)) {} + + shared_wrapper& operator=(const shared_wrapper& other) noexcept { + // check for self-assignment was required by clang-tidy + // although it is not necessary here + if (this == &other) { + return *this; + } + auto tmp = other; + ::std::swap(sh_state, tmp.sh_state); + return *this; + } + + shared_wrapper& operator=(shared_wrapper&& other) noexcept { + auto tmp = ::std::move(other); + ::std::swap(sh_state, tmp.sh_state); + return *this; + } + + impls_for::shared_state* sh_state; +}; + +struct split_t { + template + auto transform_sender(Sndr&& sndr) const { + auto&& child = ::std::forward(sndr).template get<2>(); + using child_type = decltype(child); + using shared_state = ::beman::execution26::detail::impls_for::shared_state; + auto* sh_state = new shared_state{::beman::execution26::detail::forward_like(child)}; + return ::beman::execution26::detail::make_sender(split_impl_t{}, shared_wrapper{sh_state}); + } + + template + requires beman::execution26::sender_in::split_env> + auto operator()(Sender&& sender) const { + auto domain{::beman::execution26::detail::get_domain_early(sender)}; + return ::beman::execution26::transform_sender( + domain, ::beman::execution26::detail::make_sender(*this, {}, ::std::forward(sender))); + } +}; + +template +struct completion_signatures_for_impl< + ::beman::execution26::detail::basic_sender<::beman::execution26::detail::split_impl_t, + ::beman::execution26::detail::shared_wrapper>, + Env> { + template + using make_value_completions = + ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(const std::decay_t&...)>; + + template + using make_error_completions = + ::beman::execution26::completion_signatures<::beman::execution26::set_error_t(const std::decay_t&)...>; + + using value_completions = ::beman::execution26:: + value_types_of_t; + + using error_completions = ::beman::execution26::error_types_of_t; + + using fixed_completions = + ::beman::execution26::completion_signatures<::beman::execution26::set_stopped_t(), + ::beman::execution26::set_error_t(std::exception_ptr)>; + + using type = ::beman::execution26::detail::meta::unique< + ::beman::execution26::detail::meta::combine>; +}; + +} // namespace beman::execution26::detail + +namespace beman::execution26 { +using split_t = ::beman::execution26::detail::split_t; + +inline constexpr ::beman::execution26::split_t split{}; +} // namespace beman::execution26 + +#endif diff --git a/tests/beman/execution26/CMakeLists.txt b/tests/beman/execution26/CMakeLists.txt index 664c34da..659123bb 100644 --- a/tests/beman/execution26/CMakeLists.txt +++ b/tests/beman/execution26/CMakeLists.txt @@ -49,6 +49,7 @@ list( exec-snd-expos.test exec-snd-transform.test exec-starts-on.test + exec-split.test exec-sync-wait.test exec-then.test exec-utils-cmplsigs.test diff --git a/tests/beman/execution26/exec-split.test.cpp b/tests/beman/execution26/exec-split.test.cpp new file mode 100644 index 00000000..c6e961e6 --- /dev/null +++ b/tests/beman/execution26/exec-split.test.cpp @@ -0,0 +1,187 @@ +// src/beman/execution26/tests/split.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +struct timed_scheduler_t : beman::execution26::scheduler_t {}; + +class some_thread_pool { + public: + some_thread_pool() = default; + some_thread_pool(const some_thread_pool&) = delete; + some_thread_pool& operator=(const some_thread_pool&) = delete; + some_thread_pool(some_thread_pool&&) noexcept = delete; + some_thread_pool& operator=(some_thread_pool&&) noexcept = delete; + + ~some_thread_pool() { + std::lock_guard lock(mutex_); + for (auto& future : futures_) { + future.get(); + } + } + + template + auto schedule(Fn&& fn) -> void { + std::lock_guard lock(mutex_); + futures_.emplace_back(std::async(std::launch::async, std::forward(fn))); + } + + private: + std::mutex mutex_; + std::list> futures_; +}; + +struct some_thread_pool_scheduler { + using scheduler_concept = timed_scheduler_t; + + template + struct timed_operation { + using operation_state_concept = beman::execution26::operation_state_t; + + Receiver receiver_; + some_thread_pool& pool_; + std::chrono::steady_clock::time_point deadline_; + + void start() noexcept try { + pool_.schedule([this] { + std::this_thread::sleep_until(deadline_); + beman::execution26::set_value(std::move(receiver_)); + }); + } catch (...) { + beman::execution26::set_error(std::move(receiver_), std::current_exception()); + } + }; + + struct timed_sender { + using sender_concept = beman::execution26::sender_t; + + using completion_signatures = + beman::execution26::completion_signatures; + + some_thread_pool* pool_; + std::chrono::steady_clock::time_point deadline_; + + struct env { + some_thread_pool* pool_; + + auto query(beman::execution26::get_completion_scheduler_t) const noexcept + -> some_thread_pool_scheduler { + return some_thread_pool_scheduler{*pool_}; + } + }; + + auto get_env() const noexcept { return env{pool_}; } + + template Receiver> + auto connect(Receiver receiver) const noexcept -> timed_operation { + return timed_operation{std::move(receiver), *pool_, deadline_}; + } + }; + + explicit some_thread_pool_scheduler(some_thread_pool& pool) : pool_(&pool) {} + + auto now() const noexcept -> std::chrono::steady_clock::time_point { return std::chrono::steady_clock::now(); } + + auto schedule_at(std::chrono::steady_clock::time_point deadline) const noexcept -> timed_sender { + return {pool_, deadline}; + } + + auto schedule_after(std::chrono::nanoseconds duration) const noexcept { + return beman::execution26::let_value(beman::execution26::just(), + [this, duration] { return schedule_at(now() + duration); }); + } + + auto schedule() const noexcept -> timed_sender { return schedule_at(now()); } + + friend bool operator==(const some_thread_pool_scheduler&, const some_thread_pool_scheduler&) noexcept = default; + + some_thread_pool* pool_; +}; + +struct NonCopyable { + NonCopyable() = default; + NonCopyable(NonCopyable&&) noexcept = default; + NonCopyable& operator=(NonCopyable&&) noexcept = default; + ~NonCopyable() = default; + NonCopyable(const NonCopyable&) = delete; + NonCopyable& operator=(const NonCopyable&) = delete; +}; + +struct empty_env {}; + +void test_destroy_unused_split() { + auto just = beman::execution26::just(NonCopyable{}); + auto split = beman::execution26::split(std::move(just)); + using split_sender_type = decltype(split); + static_assert(beman::execution26::sender_in); +} + +void test_destroy_two_unused_split() { + auto just = beman::execution26::just(NonCopyable{}); + auto split = beman::execution26::split(std::move(just)); + auto copy = split; + using split_sender_type = decltype(copy); + static_assert(beman::execution26::sender_in); +} + +using beman::execution26::detail::type_list; +using beman::execution26::detail::meta::combine; + +template +using to_set_value_t = type_list; + +void test_completion_sigs_and_sync_wait_on_split() { + auto just = beman::execution26::just(NonCopyable{}); + auto split = beman::execution26::split(std::move(just)); + using split_sender = std::decay_t; + struct empty_env {}; + using expected_value_completions = type_list; + using value_completions = beman::execution26::value_types_of_t; + static_assert(std::same_as); + + auto eat_completion = beman::execution26::then(split, [&](const NonCopyable&) {}); + ASSERT(beman::execution26::sync_wait(eat_completion)); +} + +void test_two_sync_waits_on_one_split() { + auto just = beman::execution26::just(NonCopyable{}); + auto split = beman::execution26::split(std::move(just)); + const void* pointer = nullptr; + auto save_pointer = + beman::execution26::then(split, [&](const NonCopyable& obj) { pointer = static_cast(&obj); }); + auto assert_pointer = beman::execution26::then( + split, [&](const NonCopyable& obj) { ASSERT(pointer == static_cast(&obj)); }); + ASSERT(beman::execution26::sync_wait(save_pointer)); + ASSERT(beman::execution26::sync_wait(assert_pointer)); +} + +void test_completion_from_another_thread() { + using namespace std::chrono_literals; + auto context = some_thread_pool{}; + auto scheduler = some_thread_pool_scheduler{context}; + auto split = beman::execution26::split(scheduler.schedule_after(1ms)); + auto return_42 = beman::execution26::then(split, [] { return 42; }); + auto result = beman::execution26::sync_wait(return_42); + ASSERT(result.has_value()); + if (result.has_value()) { + auto [val] = *result; + ASSERT(val == 42); + } +} + +TEST(exec_split) { + test_destroy_unused_split(); + test_destroy_two_unused_split(); + test_completion_sigs_and_sync_wait_on_split(); + test_two_sync_waits_on_one_split(); + test_completion_from_another_thread(); +}