Skip to content

Commit a3287fa

Browse files
committed
added an example using a timer and fixed an issue with sync_wait
1 parent 0a113ea commit a3287fa

File tree

3 files changed

+152
-111
lines changed

3 files changed

+152
-111
lines changed

examples/intro-2-hello-async.cpp

Lines changed: 5 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -2,141 +2,35 @@
22
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
33

44
#include <beman/execution26/execution.hpp>
5-
#include <iostream>
5+
#include "intro-timer.hpp"
66
#include <chrono>
7+
#include <iostream>
78
#include <string>
89
#include <tuple>
910

1011
namespace ex = ::beman::execution26;
1112
using namespace std::string_literals;
1213
using namespace std::chrono_literals;
1314

14-
#include <queue>
15-
#include <tuple>
16-
#include <poll.h>
17-
18-
namespace demo {
19-
struct timer {
20-
struct state_base {
21-
virtual void complete() = 0;
22-
};
23-
template <typename Receiver>
24-
struct state : state_base {
25-
using operation_state_concept = ex::operation_state_t;
26-
timer* self;
27-
std::remove_cvref_t<Receiver> receiver;
28-
std::chrono::milliseconds ms;
29-
30-
template <typename R>
31-
state(timer* self, R&& receiver, std::chrono::milliseconds ms)
32-
: state_base(), self(self), receiver(std::forward<R>(receiver)), ms(ms) {}
33-
void start() & noexcept { self->add(ms, this); }
34-
void complete() override { ex::set_value(std::move(receiver)); }
35-
};
36-
struct sender {
37-
using sender_concept = ex::sender_t;
38-
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
39-
timer* self;
40-
std::chrono::milliseconds ms;
41-
template <typename R>
42-
state<R> connect(R&& r) {
43-
return state<R>(self, std::forward<R>(r), ms);
44-
}
45-
};
46-
using time_point = std::chrono::system_clock::time_point;
47-
using value_type = std::tuple<time_point, state_base*>;
48-
std::priority_queue<value_type, std::vector<value_type>, std::greater<>> outstanding;
49-
50-
void add(std::chrono::milliseconds ms, state_base* base) {
51-
outstanding.emplace(std::chrono::system_clock::now() + ms, base);
52-
}
53-
bool run_one() {
54-
if (outstanding.empty())
55-
return false;
56-
auto [time, base] = outstanding.top();
57-
outstanding.pop();
58-
auto now{std::chrono::system_clock::now()};
59-
if (now < time) {
60-
auto dur{time - now};
61-
auto ms{std::chrono::duration_cast<std::chrono::milliseconds>(dur)};
62-
poll(nullptr, {}, int(ms.count()));
63-
}
64-
base->complete();
65-
return true;
66-
}
67-
68-
template <typename Receiver>
69-
struct run_state
70-
{
71-
using operation_state_concept = ex::operation_state_t;
72-
using scheduler = decltype(ex::get_delegation_scheduler(ex::get_env(std::declval<Receiver&>())));
73-
74-
timer* self;
75-
Receiver receiver;
76-
void start() & noexcept {
77-
std::cout << "run start\n";
78-
}
79-
};
80-
struct run_sender
81-
{
82-
using sender_concept = ex::sender_t;
83-
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
84-
85-
timer* self;
86-
87-
template <typename Receiver>
88-
run_state<std::remove_cvref_t<Receiver>> connect(Receiver&& receiver)
89-
{
90-
return { self, std::forward<Receiver>(receiver) };
91-
}
92-
};
93-
94-
auto run() {
95-
#if 0
96-
return run_sender{this};
97-
#else
98-
return ex::then(ex::just(), [this] {
99-
while (this->run_one())
100-
;
101-
});
102-
#endif
103-
}
104-
105-
template <typename T>
106-
auto resume_after(std::chrono::duration<T> d) {
107-
auto ms(std::chrono::duration_cast<std::chrono::milliseconds>(d));
108-
return sender{this, ms};
109-
}
110-
};
111-
} // namespace demo
112-
11315
// ----------------------------------------------------------------------------
11416
// Please see the explanation in docs/intro-examples.md for an explanation.
11517

116-
struct receiver {
117-
using receiver_concept = ex::receiver_t;
118-
void set_value(auto&&...) noexcept {}
119-
void set_error(auto&&) noexcept {}
120-
void set_stopped() noexcept {}
121-
};
122-
static_assert(ex::receiver<receiver>);
123-
12418
int main() {
12519
std::cout << std::unitbuf;
126-
demo::timer timer;
20+
intro::timer timer;
12721

12822
// clang-format off
12923
auto [result] = ex::sync_wait(
13024
ex::when_all(
25+
timer.run(),
13126
ex::when_all(
13227
timer.resume_after(3s)
13328
| ex::then([] { std::cout << "h\n"; return std::string("hello"); }),
13429
timer.resume_after(1s)
13530
| ex::then([] { std::cout << ",\n"; return std::string(", "); }),
13631
timer.resume_after(2s)
13732
| ex::then([] { std::cout << "w\n"; return std::string("world"); })
138-
) | ex::then([](auto s1, auto s2, auto s3) { return s1 + s2 + s3; }),
139-
timer.run()
33+
) | ex::then([](auto s1, auto s2, auto s3) { return s1 + s2 + s3; })
14034
)
14135
).value_or(std::tuple(std::string("")));
14236
// clang-format on

examples/intro-timer.hpp

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// examples/intro-timer.hpp -*-C++-*-
2+
// ----------------------------------------------------------------------------
3+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
4+
// ----------------------------------------------------------------------------
5+
6+
#ifndef INCLUDED_EXAMPLES_INTRO_TIMER
7+
#define INCLUDED_EXAMPLES_INTRO_TIMER
8+
9+
#include <beman/execution26/execution.hpp>
10+
#include <queue>
11+
#include <thread>
12+
#include <tuple>
13+
14+
// ----------------------------------------------------------------------------
15+
16+
namespace intro {
17+
namespace ex = beman::execution26;
18+
struct timer;
19+
}
20+
21+
// ----------------------------------------------------------------------------
22+
23+
struct intro::timer {
24+
struct state_base {
25+
virtual void complete() = 0;
26+
};
27+
template <typename Receiver>
28+
struct state : state_base {
29+
using operation_state_concept = ex::operation_state_t;
30+
timer* self;
31+
std::remove_cvref_t<Receiver> receiver;
32+
std::chrono::milliseconds ms;
33+
34+
template <typename R>
35+
state(timer* self, R&& receiver, std::chrono::milliseconds ms)
36+
: state_base(), self(self), receiver(std::forward<R>(receiver)), ms(ms) {}
37+
void start() & noexcept { self->add(ms, this); }
38+
void complete() override { ex::set_value(std::move(receiver)); }
39+
};
40+
struct sender {
41+
using sender_concept = ex::sender_t;
42+
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
43+
timer* self;
44+
std::chrono::milliseconds ms;
45+
template <typename R>
46+
state<R> connect(R&& r) {
47+
return state<R>(self, std::forward<R>(r), ms);
48+
}
49+
};
50+
using time_point = std::chrono::system_clock::time_point;
51+
using value_type = std::tuple<time_point, state_base*>;
52+
std::priority_queue<value_type, std::vector<value_type>, std::greater<>> outstanding;
53+
54+
void add(std::chrono::milliseconds ms, state_base* base) {
55+
outstanding.emplace(std::chrono::system_clock::now() + ms, base);
56+
}
57+
bool run_one() {
58+
if (outstanding.empty())
59+
return false;
60+
auto [time, base] = outstanding.top();
61+
outstanding.pop();
62+
auto now{std::chrono::system_clock::now()};
63+
if (now < time) {
64+
std::this_thread::sleep_for(time - now);
65+
}
66+
base->complete();
67+
return true;
68+
}
69+
70+
template <typename Receiver>
71+
struct run_state
72+
{
73+
struct recv {
74+
using receiver_concept = ex::receiver_t;
75+
run_state* self;
76+
77+
auto set_value(auto&&...) noexcept -> void { this->self->run_one(); }
78+
auto set_error(auto&&) noexcept -> void { this->self->run_one(); }
79+
auto set_stopped() noexcept -> void { this->self->run_one(); }
80+
};
81+
using operation_state_concept = ex::operation_state_t;
82+
using scheduler_t = decltype(ex::get_delegation_scheduler(ex::get_env(std::declval<Receiver&>())));
83+
static_assert(ex::receiver<recv>);
84+
static_assert(ex::scheduler<scheduler_t>);
85+
static_assert(ex::sender<decltype(ex::schedule(std::declval<scheduler_t>()))>);
86+
using state_t = decltype(ex::connect(ex::schedule(std::declval<scheduler_t>()), std::declval<recv>()));
87+
struct state_ctor {
88+
state_t state;
89+
template <typename S, typename R>
90+
state_ctor(S&& sender, R&& receiver)
91+
: state(ex::connect(std::forward<S>(sender), std::forward<R>(receiver)))
92+
{
93+
}
94+
};
95+
96+
timer* self;
97+
Receiver receiver;
98+
std::optional<state_ctor> state{};
99+
100+
auto schedule_one() {
101+
this->state.emplace(
102+
ex::schedule(ex::get_delegation_scheduler(ex::get_env(this->receiver))),
103+
recv{this}
104+
);
105+
ex::start(this->state->state);
106+
}
107+
auto run_one() {
108+
this->state.reset();
109+
if (this->self->run_one())
110+
this->schedule_one();
111+
else
112+
ex::set_value(std::move(this->receiver));
113+
}
114+
auto start() & noexcept -> void {
115+
this->schedule_one();
116+
}
117+
};
118+
struct run_sender
119+
{
120+
using sender_concept = ex::sender_t;
121+
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
122+
123+
timer* self;
124+
125+
template <typename Receiver>
126+
run_state<std::remove_cvref_t<Receiver>> connect(Receiver&& receiver)
127+
{
128+
return { self, std::forward<Receiver>(receiver) };
129+
}
130+
};
131+
132+
auto run() { return run_sender{this}; }
133+
134+
template <typename T>
135+
auto resume_after(std::chrono::duration<T> d) {
136+
auto ms(std::chrono::duration_cast<std::chrono::milliseconds>(d));
137+
return sender{this, ms};
138+
}
139+
};
140+
141+
// ----------------------------------------------------------------------------
142+
143+
#endif

include/beman/execution26/detail/sync_wait.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ struct sync_wait_receiver {
5353

5454
::beman::execution26::detail::sync_wait_state<Sender>* state{};
5555

56+
auto get_env() const noexcept -> ::beman::execution26::detail::sync_wait_env {
57+
return sync_wait_env{ &this->state->loop };
58+
}
59+
5660
template <typename Error>
5761
auto set_error(Error&& error) && noexcept -> void {
5862
this->state->error = ::beman::execution26::detail::as_except_ptr(::std::forward<Error>(error));

0 commit comments

Comments
 (0)