Skip to content

Commit 224cae0

Browse files
elbenomjcaisse-intel
authored andcommitted
✨ Use senders to achieve callback request-response semantics
1 parent 5b8cb6e commit 224cae0

File tree

4 files changed

+211
-4
lines changed

4 files changed

+211
-4
lines changed

CMakeLists.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ include(cmake/string_catalog.cmake)
2424

2525
add_versioned_package("gh:boostorg/mp11#boost-1.83.0")
2626
fmt_recipe(10.2.1)
27-
add_versioned_package("gh:intel/cpp-std-extensions#6c4fb17")
28-
add_versioned_package("gh:intel/cpp-baremetal-concurrency#8d49b6d")
27+
add_versioned_package("gh:intel/cpp-std-extensions#646bdbe")
28+
add_versioned_package("gh:intel/cpp-baremetal-concurrency#fef18ca")
29+
add_versioned_package("gh:intel/cpp-baremetal-senders-and-receivers#113eeff")
2930

3031
add_library(cib INTERFACE)
3132
target_compile_features(cib INTERFACE cxx_std_20)
3233
target_include_directories(cib INTERFACE include)
33-
target_link_libraries_system(cib INTERFACE concurrency fmt::fmt-header-only
34-
stdx)
34+
target_link_libraries_system(cib INTERFACE async concurrency
35+
fmt::fmt-header-only stdx)
3536

3637
target_compile_options(
3738
cib

include/msg/send.hpp

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#pragma once
2+
3+
#include <async/completion_tags.hpp>
4+
#include <async/concepts.hpp>
5+
#include <async/connect.hpp>
6+
#include <async/schedulers/trigger_scheduler.hpp>
7+
#include <async/start.hpp>
8+
#include <async/then.hpp>
9+
10+
#include <stdx/concepts.hpp>
11+
#include <stdx/ct_string.hpp>
12+
13+
#include <type_traits>
14+
#include <utility>
15+
16+
namespace msg {
17+
namespace _send_recv {
18+
template <typename Sched>
19+
using scheduler_sender = decltype(std::declval<Sched>().schedule());
20+
21+
template <typename S>
22+
concept valid_send_action =
23+
requires { typename std::remove_cvref_t<S>::is_send_action; };
24+
25+
template <valid_send_action SA, typename Sched, typename Rcvr>
26+
// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions)
27+
struct op_state {
28+
template <stdx::same_as_unqualified<SA> S, typename Sch, typename R>
29+
constexpr op_state(S &&s, Sch &&sch, R &&r)
30+
: send(s), ops{async::connect(std::forward<Sch>(sch).schedule(),
31+
std::forward<R>(r))} {}
32+
constexpr op_state(op_state &&) = delete;
33+
34+
using ops_t = async::connect_result_t<scheduler_sender<Sched>, Rcvr>;
35+
36+
constexpr auto start() & -> void {
37+
async::start(ops);
38+
send();
39+
}
40+
41+
SA send;
42+
ops_t ops;
43+
};
44+
45+
template <valid_send_action SA, typename Sched> struct sender {
46+
using is_sender = void;
47+
48+
[[no_unique_address]] SA send;
49+
[[no_unique_address]] Sched sched;
50+
51+
public:
52+
template <async::receiver R>
53+
[[nodiscard]] constexpr auto
54+
connect(R &&r) && -> op_state<SA, Sched, std::remove_cvref_t<R>> {
55+
async::check_connect<sender &&, R>();
56+
return {std::move(send), std::move(sched), std::forward<R>(r)};
57+
}
58+
59+
template <async::receiver R>
60+
[[nodiscard]] constexpr auto
61+
connect(R &&r) const & -> op_state<SA, Sched, std::remove_cvref_t<R>> {
62+
async::check_connect<sender, R>();
63+
return {send, sched, std::forward<R>(r)};
64+
}
65+
66+
template <typename Env>
67+
[[nodiscard]] constexpr static auto get_completion_signatures(Env const &)
68+
-> async::completion_signatures_of_t<scheduler_sender<Sched>, Env> {
69+
return {};
70+
}
71+
};
72+
73+
template <typename Sndr, typename Sched>
74+
sender(Sndr, Sched) -> sender<Sndr, Sched>;
75+
76+
template <typename F> struct send_action : F {
77+
using is_send_action = void;
78+
};
79+
template <typename F> send_action(F) -> send_action<F>;
80+
81+
template <stdx::ct_string Name, typename... Args> struct pipeable {
82+
template <typename Adaptor> struct type {
83+
[[no_unique_address]] Adaptor a;
84+
85+
private:
86+
template <valid_send_action S, stdx::same_as_unqualified<type> Self>
87+
friend constexpr auto operator|(S &&s, Self &&self) -> async::sender
88+
auto {
89+
return _send_recv::sender{
90+
std::forward<S>(s),
91+
async::trigger_scheduler<Name, Args...>{}} |
92+
std::forward<Self>(self).a;
93+
}
94+
};
95+
96+
template <typename T> type(T) -> type<T>;
97+
};
98+
} // namespace _send_recv
99+
100+
template <typename F, typename... Args>
101+
constexpr auto send(F &&f, Args &&...args) {
102+
return _send_recv::send_action{
103+
[f = std::forward<F>(f), ... args = std::forward<Args>(args)]() {
104+
return f(args...);
105+
}};
106+
}
107+
108+
template <stdx::ct_string Name, typename... RecvArgs, typename F,
109+
typename... Args>
110+
[[nodiscard]] constexpr auto then_receive(F &&f, Args &&...args) {
111+
return typename _send_recv::pipeable<Name, RecvArgs...>::type{async::then(
112+
[f = std::forward<F>(f), ... args = std::forward<Args>(args)](
113+
RecvArgs const &...as) { return f(as..., args...); })};
114+
}
115+
116+
template <stdx::ct_string Name, _send_recv::valid_send_action S, typename F,
117+
typename... Args>
118+
[[nodiscard]] constexpr auto then_receive(S &&s, F &&f,
119+
Args &&...args) -> async::sender
120+
auto {
121+
return std::forward<S>(s) |
122+
then_receive<Name>(std::forward<F>(f), std::forward<Args>(args)...);
123+
}
124+
} // namespace msg

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ add_tests(
6767
msg/indexed_callback
6868
msg/indexed_handler
6969
msg/message
70+
msg/send
7071
sc/format
7172
sc/string_constant
7273
seq/sequencer)

test/msg/send.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#include <async/schedulers/trigger_manager.hpp>
2+
#include <async/start_detached.hpp>
3+
#include <cib/cib.hpp>
4+
#include <msg/callback.hpp>
5+
#include <msg/field.hpp>
6+
#include <msg/message.hpp>
7+
#include <msg/send.hpp>
8+
#include <msg/service.hpp>
9+
10+
#include <stdx/ct_conversions.hpp>
11+
#include <stdx/ct_string.hpp>
12+
13+
#include <catch2/catch_template_test_macros.hpp>
14+
#include <catch2/catch_test_macros.hpp>
15+
16+
namespace {
17+
template <typename T>
18+
constexpr auto type_string =
19+
stdx::ct_string<stdx::type_as_string<T>().size() + 1>{
20+
stdx::type_as_string<T>()};
21+
} // namespace
22+
23+
TEMPLATE_TEST_CASE("request-response", "[send]", decltype([] {})) {
24+
constexpr auto name = type_string<TestType>;
25+
int var{};
26+
27+
auto s = msg::send([&](auto i) { async::run_triggers<name>(i); }, 42) |
28+
msg::then_receive<name, int>(
29+
[&](auto recvd, auto x) { var = recvd + x; }, 17);
30+
CHECK(var == 0);
31+
CHECK(async::start_detached_unstoppable(s));
32+
CHECK(var == 59);
33+
}
34+
35+
namespace {
36+
using msg::at;
37+
using msg::operator""_dw;
38+
using msg::operator""_msb;
39+
using msg::operator""_lsb;
40+
using msg::operator""_f;
41+
42+
using id_field =
43+
msg::field<"id", std::uint32_t>::located<at{0_dw, 31_msb, 24_lsb}>;
44+
using field1 =
45+
msg::field<"f1", std::uint32_t>::located<at{0_dw, 15_msb, 0_lsb}>;
46+
using field2 =
47+
msg::field<"f2", std::uint32_t>::located<at{1_dw, 23_msb, 16_lsb}>;
48+
using field3 =
49+
msg::field<"f3", std::uint32_t>::located<at{1_dw, 15_msb, 0_lsb}>;
50+
51+
using msg_defn = msg::message<"msg", id_field, field1, field2, field3>;
52+
using test_msg_t = msg::owning<msg_defn>;
53+
using msg_view_t = msg::const_view<msg_defn>;
54+
55+
constexpr auto test_callback = msg::callback<"cb", msg_defn>(
56+
"id"_f == msg::constant<0x80>,
57+
[](msg_view_t v) { async::run_triggers<"cb">(v); });
58+
59+
struct test_service : msg::service<msg_view_t> {};
60+
struct test_project {
61+
constexpr static auto config = cib::config(
62+
cib::exports<test_service>, cib::extend<test_service>(test_callback));
63+
};
64+
} // namespace
65+
66+
TEST_CASE("request-response through handler", "[send]") {
67+
cib::nexus<test_project> test_nexus{};
68+
test_nexus.init();
69+
70+
std::uint32_t var{};
71+
auto s =
72+
msg::send(
73+
[&](auto id) {
74+
cib::service<test_service>->handle(test_msg_t{"id"_f = id});
75+
},
76+
0x80) |
77+
msg::then_receive<"cb", msg_view_t>(
78+
[&](auto v) { var = v.get("id"_f); });
79+
CHECK(async::start_detached_unstoppable(s));
80+
CHECK(var == 0x80);
81+
}

0 commit comments

Comments
 (0)