Skip to content

Commit 3f68d78

Browse files
author
Dmitry Malakhov
committed
Implement ability to sleep
1 parent f9d13e0 commit 3f68d78

File tree

6 files changed

+160
-1
lines changed

6 files changed

+160
-1
lines changed

include/corosig/Coro.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "corosig/Result.hpp"
66
#include "corosig/reactor/CoroList.hpp"
77
#include "corosig/reactor/Reactor.hpp"
8+
#include "corosig/reactor/SleepList.hpp"
89
#include "corosig/util/SetDefaultOnMove.hpp"
910

1011
#include <cassert>
@@ -80,6 +81,11 @@ struct CoroutinePromiseType : CoroListNode {
8081
m_reactor.schedule(*this);
8182
}
8283

84+
/// @brief Add this SleepListNode into reactor to be executed later, when time comes
85+
void queue_to_reactor(SleepListNode &node) noexcept {
86+
m_reactor.schedule_when_time_passes(node);
87+
}
88+
8389
/// @brief Add this PollListNode into reactor to be executed later, when event becomes awailable
8490
void poll_to_reactor(PollListNode &node) noexcept {
8591
m_reactor.schedule_when_ready(node);

include/corosig/Sleep.hpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#ifndef COROSIG_SLEEP_HPP
2+
#define COROSIG_SLEEP_HPP
3+
4+
#include "corosig/Clock.hpp"
5+
#include "corosig/reactor/SleepList.hpp"
6+
7+
#include <csignal>
8+
#include <cstdlib>
9+
10+
namespace corosig {
11+
12+
/// @brief Break the execution of a coroutine until specified amount of time passes
13+
struct Sleep : SleepListNode {
14+
Sleep(Clock::time_point until) {
15+
this->awake_time = until;
16+
}
17+
18+
template <typename REP, typename PERIOD>
19+
Sleep(std::chrono::duration<REP, PERIOD> sleep_for)
20+
: Sleep{Clock::now() + sleep_for} {
21+
}
22+
23+
[[nodiscard]] static bool await_ready() noexcept {
24+
return false;
25+
}
26+
27+
template <typename PROMISE>
28+
void await_suspend(std::coroutine_handle<PROMISE> h) noexcept {
29+
this->waiting_coro = h;
30+
h.promise().queue_to_reactor(*this);
31+
}
32+
33+
void await_resume() const noexcept {
34+
}
35+
};
36+
37+
} // namespace corosig
38+
39+
#endif

include/corosig/reactor/Reactor.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "corosig/container/Allocator.hpp"
77
#include "corosig/reactor/CoroList.hpp"
88
#include "corosig/reactor/PollList.hpp"
9+
#include "corosig/reactor/SleepList.hpp"
910

1011
#include <cstddef>
1112

@@ -34,6 +35,10 @@ struct Reactor {
3435
/// @brief Schedule a coroutine to be executed when handle recieves specified event
3536
void schedule_when_ready(PollListNode &) noexcept;
3637

38+
/// @brief Schedule a coroutine to be executed when specified amount of time passes
39+
/// @warning UB if given node is ready
40+
void schedule_when_time_passes(SleepListNode &) noexcept;
41+
3742
/// @brief Tell if there are any tasks scheduled
3843
[[nodiscard]] bool has_active_tasks() const noexcept;
3944

@@ -49,6 +54,7 @@ struct Reactor {
4954
private:
5055
PollList m_polled;
5156
CoroList m_ready;
57+
SleepList m_sleeping;
5258
Allocator m_alloc;
5359
};
5460

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#ifndef COROSIG_REACTOR_SLEEP_LIST_HPP
2+
#define COROSIG_REACTOR_SLEEP_LIST_HPP
3+
4+
#include "boost/intrusive/intrusive_fwd.hpp"
5+
#include "boost/intrusive/link_mode.hpp"
6+
#include "corosig/Clock.hpp"
7+
8+
#include <boost/intrusive/avl_set.hpp>
9+
#include <boost/intrusive/avl_set_hook.hpp>
10+
#include <boost/intrusive/options.hpp>
11+
#include <coroutine>
12+
13+
namespace corosig {
14+
15+
/// @brief A node type for sleep task which may be pending or ready
16+
struct SleepListNode : boost::intrusive::avl_set_base_hook<
17+
boost::intrusive::link_mode<boost::intrusive::link_mode_type::safe_link>,
18+
boost::intrusive::optimize_size<true>> {
19+
auto operator<=>(SleepListNode const &rhs) const noexcept {
20+
return awake_time <=> rhs.awake_time;
21+
}
22+
23+
/// @brief Coro to be resumed
24+
std::coroutine_handle<> waiting_coro = nullptr;
25+
26+
/// @brief When to resume a coro
27+
Clock::time_point awake_time;
28+
};
29+
30+
/// @brief A list of objects which are castable to coroutines via coro_from_this virtual method
31+
using SleepList =
32+
boost::intrusive::avl_multiset<SleepListNode, boost::intrusive::constant_time_size<false>>;
33+
34+
} // namespace corosig
35+
36+
#endif

src/reactor/Reactor.cpp

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#include "corosig/reactor/Reactor.hpp"
22

3+
#include "corosig/Clock.hpp"
34
#include "corosig/ErrorTypes.hpp"
45
#include "corosig/Result.hpp"
56
#include "corosig/reactor/CoroList.hpp"
67
#include "corosig/reactor/PollList.hpp"
8+
#include "corosig/reactor/SleepList.hpp"
79

810
#include <chrono>
911
#include <coroutine>
@@ -15,6 +17,21 @@ namespace {
1517

1618
using namespace corosig;
1719

20+
void resume_ready_sleepers(SleepList &sleeping) noexcept {
21+
auto now = Clock::now();
22+
23+
while (!sleeping.empty()) {
24+
SleepListNode &node = *sleeping.begin();
25+
if (node.awake_time > now) {
26+
break;
27+
}
28+
sleeping.erase(node);
29+
assert(node.waiting_coro != nullptr);
30+
assert(!node.waiting_coro.done());
31+
node.waiting_coro.resume();
32+
}
33+
}
34+
1835
Result<void, SyscallError>
1936
poll_and_resume(PollList &polled, std::chrono::duration<int, std::milli> timeout) noexcept {
2037
if (polled.empty()) {
@@ -59,11 +76,16 @@ void resume(CoroList &ready) noexcept {
5976
auto &node = ready.front();
6077
ready.pop_front();
6178
auto coro = node.coro_from_this();
79+
assert(coro != nullptr);
6280
assert(!coro.done());
6381
coro.resume();
6482
}
6583
}
6684

85+
std::chrono::milliseconds ceil_to_millis(std::chrono::nanoseconds nanos) noexcept {
86+
return std::chrono::milliseconds{nanos.count() / 1000 + nanos.count() % 1000};
87+
}
88+
6789
} // namespace
6890

6991
namespace corosig {
@@ -80,6 +102,10 @@ void Reactor::schedule_when_ready(PollListNode &node) noexcept {
80102
m_polled.push_back(node);
81103
}
82104

105+
void Reactor::schedule_when_time_passes(SleepListNode &node) noexcept {
106+
m_sleeping.insert(node);
107+
}
108+
83109
bool Reactor::has_active_tasks() const noexcept {
84110
return !m_polled.empty() && !m_ready.empty();
85111
}
@@ -93,13 +119,17 @@ size_t Reactor::current_memory() const noexcept {
93119
}
94120

95121
Result<void, SyscallError> Reactor::do_event_loop_iteration() noexcept {
96-
assert((!m_ready.empty() || !m_polled.empty()) && "Nothing to process. Deadlock will happen");
122+
assert((!m_sleeping.empty() || !m_ready.empty() || !m_polled.empty()) &&
123+
"Nothing to process. Deadlock will happen");
124+
resume_ready_sleepers(m_sleeping);
97125
resume(m_ready);
98126

99127
using namespace std::chrono_literals;
100128
auto poll_timeout = -1ms;
101129
if (!m_ready.empty()) {
102130
poll_timeout = 0ms;
131+
} else if (!m_sleeping.empty()) {
132+
poll_timeout = std::max(0ms, ceil_to_millis(m_sleeping.begin()->awake_time - Clock::now()));
103133
}
104134

105135
return poll_and_resume(m_polled, poll_timeout);

test/cases/Sleep.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "corosig/Sleep.hpp"
2+
3+
#include "corosig/Clock.hpp"
4+
#include "corosig/Coro.hpp"
5+
#include "corosig/Parallel.hpp"
6+
#include "corosig/Result.hpp"
7+
#include "corosig/reactor/Reactor.hpp"
8+
#include "corosig/testing/Signals.hpp"
9+
10+
namespace {
11+
12+
using namespace corosig;
13+
using namespace std::chrono_literals;
14+
15+
} // namespace
16+
17+
COROSIG_SIGHANDLER_TEST_CASE("Sleep is ok") {
18+
auto start = Clock::now();
19+
auto foo = [](Reactor &) -> Fut<int> {
20+
co_await Sleep{10ms};
21+
co_return 20;
22+
};
23+
auto res = foo(reactor).block_on();
24+
COROSIG_REQUIRE(res);
25+
COROSIG_REQUIRE(res.value() == 20);
26+
COROSIG_REQUIRE(Clock::now() - start >= 10ms);
27+
}
28+
29+
COROSIG_SIGHANDLER_TEST_CASE("Parallel sleep is ok") {
30+
constexpr static auto FOO = [](Reactor &) -> Fut<void> {
31+
co_await Sleep{10ms};
32+
co_return Ok{};
33+
};
34+
35+
constexpr static auto FOO2 = [](Reactor &r) -> Fut<void> {
36+
COROSIG_CO_TRYV(co_await when_all_succeed(r, FOO(r), FOO(r), FOO(r), FOO(r)));
37+
co_return Ok{};
38+
};
39+
40+
auto res = FOO2(reactor).block_on();
41+
COROSIG_REQUIRE(res);
42+
}

0 commit comments

Comments
 (0)