Skip to content

Commit 57fb7d9

Browse files
authored
Merge pull request #25 from bemanproject/environment
Environment
2 parents 7237f41 + b09ba2e commit 57fb7d9

File tree

6 files changed

+277
-10
lines changed

6 files changed

+277
-10
lines changed

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ FetchContent_Declare(
3838
GIT_TAG 9cd2e66
3939
)
4040
FetchContent_MakeAvailable(execution)
41+
FetchContent_Declare(
42+
net
43+
# for local development, use SOURCE_DIR <path-to>/execution
44+
GIT_REPOSITORY https://github.com/bemanproject/net
45+
GIT_TAG 193d043002143242731f9baf759efc6d624cace7
46+
)
47+
if(NOT WIN32)
48+
FetchContent_MakeAvailable(net)
49+
endif()
4150

4251
add_subdirectory(src/beman/task)
4352

examples/CMakeLists.txt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
set(ALL_EXAMPLES
44
task_scheduler
5+
environment
56
c++now-basic
67
c++now-result-types
78
c++now-errors
@@ -33,7 +34,15 @@ message("Examples to be built: ${ALL_EXAMPLES}")
3334
foreach(example ${ALL_EXAMPLES})
3435
add_executable(beman.task.examples.${example})
3536
target_sources(beman.task.examples.${example} PRIVATE ${example}.cpp)
36-
target_link_libraries(beman.task.examples.${example} beman::task)
37+
if(WIN32)
38+
target_link_libraries(beman.task.examples.${example} beman::task)
39+
else()
40+
target_link_libraries(
41+
beman.task.examples.${example}
42+
beman::task
43+
beman::net
44+
)
45+
endif()
3746
add_test(
3847
NAME beman.task.examples.${example}
3948
COMMAND $<TARGET_FILE:beman.task.examples.${example}>

examples/c++now-affinity.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class thread_context {
3333
std::mutex mutex;
3434
std::condition_variable condition;
3535
std::thread thread{[this] { this->run(this->source.get_token()); }};
36-
base* queue;
36+
base* queue{};
3737

3838
void run(auto token) {
3939
while (true) {

examples/demo-scope.hpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// examples/demo_scope.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_EXAMPLES_DEMO_SCOPE
5+
#define INCLUDED_EXAMPLES_DEMO_SCOPE
6+
7+
#include <beman/net/net.hpp>
8+
#include <atomic>
9+
#include <iostream>
10+
#include <utility>
11+
12+
// ----------------------------------------------------------------------------
13+
14+
namespace demo {
15+
namespace ex = ::beman::net::detail::ex;
16+
17+
class scope {
18+
private:
19+
static constexpr bool log_completions{false};
20+
struct env {
21+
scope* self;
22+
23+
auto query(ex::get_stop_token_t) const noexcept { return this->self->source.get_token(); }
24+
};
25+
26+
struct job_base {
27+
virtual ~job_base() = default;
28+
};
29+
30+
struct receiver {
31+
using receiver_concept = ex::receiver_t;
32+
scope* self;
33+
job_base* state{};
34+
35+
auto set_error(auto&&) noexcept -> void {
36+
::std::cerr << "ERROR: demo::scope::job in scope completed with error!\n";
37+
this->complete();
38+
}
39+
auto set_value() && noexcept -> void {
40+
if (log_completions)
41+
std::cout << "demo::scope::set_value()\n";
42+
this->complete();
43+
}
44+
auto set_stopped() && noexcept -> void {
45+
if (log_completions)
46+
std::cout << "demo::scope::set_stopped()\n";
47+
this->complete();
48+
}
49+
auto complete() -> void {
50+
scope* slf{this->self};
51+
delete this->state;
52+
if (0u == --slf->count) {
53+
slf->complete();
54+
}
55+
}
56+
auto get_env() const noexcept -> env { return {this->self}; }
57+
};
58+
59+
template <typename Sender>
60+
struct job : job_base {
61+
using state_t = decltype(ex::connect(std::declval<Sender&&>(), std::declval<receiver>()));
62+
state_t state;
63+
template <typename S>
64+
job(scope* self, S&& sender) : state(ex::connect(::std::forward<S>(sender), receiver{self, this})) {
65+
ex::start(this->state);
66+
}
67+
};
68+
69+
std::atomic<std::size_t> count{};
70+
ex::inplace_stop_source source;
71+
72+
auto complete() -> void {}
73+
74+
public:
75+
~scope() {
76+
if (0u < this->count)
77+
std::cerr << "ERROR: scope destroyed with live jobs: " << this->count << "\n";
78+
}
79+
template <ex::sender Sender>
80+
auto spawn(Sender&& sender) {
81+
++this->count;
82+
new job<Sender>(this, std::forward<Sender>(sender));
83+
}
84+
auto stop() -> void { this->source.request_stop(); }
85+
auto empty() const -> bool { return 0u == this->count; }
86+
};
87+
} // namespace demo
88+
89+
// ----------------------------------------------------------------------------
90+
91+
#endif

examples/demo-thread_pool.hpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <thread>
1414

1515
namespace demo {
16-
namespace ex = beman::execution;
1716
struct thread_pool {
1817

1918
struct node {
@@ -53,18 +52,18 @@ struct thread_pool {
5352
}
5453

5554
struct scheduler {
56-
using scheduler_concept = ex::scheduler_t;
55+
using scheduler_concept = beman::execution::scheduler_t;
5756
struct env {
5857
thread_pool* pool;
5958

6059
template <typename T>
61-
scheduler query(const ex::get_completion_scheduler_t<T>&) const noexcept {
60+
scheduler query(const beman::execution::get_completion_scheduler_t<T>&) const noexcept {
6261
return {this->pool};
6362
}
6463
};
6564
template <typename Receiver>
6665
struct state final : thread_pool::node {
67-
using operation_state_concept = ex::operation_state_t;
66+
using operation_state_concept = beman::execution::operation_state_t;
6867
std::remove_cvref_t<Receiver> receiver;
6968
thread_pool* pool;
7069

@@ -77,11 +76,11 @@ struct thread_pool {
7776
}
7877
this->pool->condition.notify_one();
7978
}
80-
void run() override { ex::set_value(std::move(this->receiver)); }
79+
void run() override { beman::execution::set_value(std::move(this->receiver)); }
8180
};
8281
struct sender {
83-
using sender_concept = ex::sender_t;
84-
using completion_signatures = ex::completion_signatures<ex::set_value_t()>;
82+
using sender_concept = beman::execution::sender_t;
83+
using completion_signatures = beman::execution::completion_signatures<beman::execution::set_value_t()>;
8584
thread_pool* pool;
8685
template <typename Receiver>
8786
state<Receiver> connect(Receiver&& receiver) {
@@ -97,7 +96,7 @@ struct thread_pool {
9796
scheduler get_scheduler() { return {this}; }
9897
};
9998

100-
static_assert(ex::scheduler<thread_pool::scheduler>);
99+
static_assert(beman::execution::scheduler<thread_pool::scheduler>);
101100

102101
} // namespace demo
103102

examples/environment.cpp

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// examples/environment.cpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef _MSC_VER
5+
#include <beman/task/task.hpp>
6+
#include <beman/execution/execution.hpp>
7+
#include <beman/net/net.hpp>
8+
#include <chrono>
9+
#include <string>
10+
#include <thread>
11+
#include <utility>
12+
#include <type_traits>
13+
#include "demo-scope.hpp"
14+
#include "demo-thread_pool.hpp"
15+
16+
namespace ex = beman::execution;
17+
namespace net = beman::net;
18+
using namespace std::chrono_literals;
19+
20+
// ----------------------------------------------------------------------------
21+
22+
template <ex::scheduler Sched, ex::sender Sender>
23+
void spawn(Sched&& sched, demo::scope& scope, Sender&& sender) {
24+
scope.spawn(ex::detail::write_env(std::forward<Sender>(sender),
25+
ex::detail::make_env(ex::get_scheduler, std::forward<Sched>(sched))));
26+
}
27+
28+
// ----------------------------------------------------------------------------
29+
30+
class environment {
31+
static thread_local std::string name;
32+
33+
public:
34+
static auto get() -> std::string { return name; }
35+
static auto set(const std::string& n) -> void { name = n; }
36+
};
37+
thread_local std::string environment::name{"<none>"};
38+
39+
struct env_scheduler {
40+
using scheduler_concept = ex::scheduler_t;
41+
42+
std::string name;
43+
ex::task_scheduler scheduler;
44+
45+
template <typename Sched>
46+
env_scheduler(std::string n, Sched&& sched) : name(std::move(n)), scheduler(std::forward<Sched>(sched)) {}
47+
48+
template <ex::receiver Rcvr>
49+
struct receiver {
50+
using receiver_concept = ex::receiver_t;
51+
52+
std::remove_cvref_t<Rcvr> rcvr;
53+
std::string name;
54+
55+
receiver(Rcvr&& r, std::string n) : rcvr(std::forward<Rcvr>(r)), name(std::move(n)) {}
56+
auto get_env() const noexcept { return ex::get_env(this->rcvr); }
57+
auto set_value() && noexcept {
58+
environment::set(std::move(this->name));
59+
ex::set_value(std::move(this->rcvr));
60+
}
61+
template <typename E>
62+
auto set_error(E&& e) && noexcept {
63+
environment::set(std::move(this->name));
64+
ex::set_error(std::move(this->rcvr), std::forward<E>(e));
65+
}
66+
auto set_stopped() && noexcept {
67+
environment::set(std::move(this->name));
68+
ex::set_stopped(std::move(this->rcvr));
69+
}
70+
};
71+
72+
struct env {
73+
std::string name;
74+
ex::task_scheduler scheduler;
75+
template <typename Tag>
76+
auto query(const ex::get_completion_scheduler_t<Tag>&) const noexcept {
77+
return env_scheduler(this->name, this->scheduler);
78+
}
79+
};
80+
81+
struct sender {
82+
using sender_concept = ex::sender_t;
83+
using task_sender = decltype(ex::schedule(std::declval<ex::task_scheduler>()));
84+
template <typename E>
85+
auto get_completion_signatures(const E& e) const noexcept {
86+
return ex::get_completion_signatures(this->sender, e);
87+
}
88+
89+
std::string name;
90+
task_sender sender;
91+
92+
auto get_env() const noexcept -> env {
93+
return env{this->name, ex::get_completion_scheduler<ex::set_value_t>(ex::get_env(this->sender))};
94+
}
95+
96+
template <ex::receiver Rcvr>
97+
auto connect(Rcvr&& rcvr) && {
98+
return ex::connect(std::move(this->sender),
99+
receiver<Rcvr>(std::forward<Rcvr>(rcvr), std::move(this->name)));
100+
}
101+
};
102+
103+
auto schedule() -> sender { return sender{this->name, ex::schedule(this->scheduler)}; }
104+
bool operator==(const env_scheduler&) const = default;
105+
};
106+
107+
struct with_env {
108+
using scheduler_type = env_scheduler;
109+
};
110+
111+
// ----------------------------------------------------------------------------
112+
113+
std::ostream& print_env(std::ostream& out) {
114+
return out << "tid=" << std::this_thread::get_id() << " "
115+
<< "env=" << environment::get();
116+
}
117+
118+
ex::task<void, with_env> run(auto scheduler, auto duration) {
119+
std::cout << print_env << " duration=" << duration << " start\n" << std::flush;
120+
for (int i = 0; i != 4; ++i) {
121+
co_await net::resume_after(scheduler, duration);
122+
std::cout << print_env << " duration=" << duration << "\n" << std::flush;
123+
}
124+
std::cout << print_env << " duration=" << duration << " done\n" << std::flush;
125+
}
126+
127+
const std::string text("####");
128+
[[maybe_unused]] const std::string black("\x1b[30m" + text + "\x1b[0m:");
129+
[[maybe_unused]] const std::string red("\x1b[31m" + text + "\x1b[0m:");
130+
[[maybe_unused]] const std::string green("\x1b[32m" + text + "\x1b[0m:");
131+
[[maybe_unused]] const std::string yellow("\x1b[33m" + text + "\x1b[0m:");
132+
[[maybe_unused]] const std::string blue("\x1b[34m" + text + "\x1b[0m:");
133+
[[maybe_unused]] const std::string magenta("\x1b[35m" + text + "\x1b[0m:");
134+
[[maybe_unused]] const std::string cyan("\x1b[36m" + text + "\x1b[0m:");
135+
[[maybe_unused]] const std::string white("\x1b[37m" + text + "\x1b[0m:");
136+
137+
int main() {
138+
demo::thread_pool pool1;
139+
demo::thread_pool pool2;
140+
net::io_context context;
141+
demo::scope scope;
142+
143+
environment::set("main");
144+
145+
ex::sync_wait(ex::schedule(pool1.get_scheduler()) | ex::then([] { environment::set("thread1"); }) |
146+
ex::then([] { std::cout << print_env << "\n"; }));
147+
std::cout << print_env << "\n";
148+
149+
spawn(env_scheduler(magenta, pool1.get_scheduler()), scope, run(context.get_scheduler(), 100ms));
150+
spawn(env_scheduler(green, pool1.get_scheduler()), scope, run(context.get_scheduler(), 150ms));
151+
spawn(env_scheduler(blue, pool1.get_scheduler()), scope, run(context.get_scheduler(), 250ms));
152+
153+
while (!scope.empty()) {
154+
context.run();
155+
}
156+
}
157+
#else
158+
int main() {}
159+
#endif

0 commit comments

Comments
 (0)