This repository was archived by the owner on Oct 6, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathchannel_sample_wrap.cpp
More file actions
98 lines (83 loc) · 2.57 KB
/
channel_sample_wrap.cpp
File metadata and controls
98 lines (83 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* @author github.com/luncliff (luncliff@gmail.com)
*/
#undef NDEBUG
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <initializer_list>
using namespace std;
//
// interface which doesn't require coroutine
//
using message_t = uint64_t;
using callback_t = void* (*)(void* context, message_t msg);
struct opaque_t;
opaque_t* start_messaging(void* context, callback_t on_message);
void stop_messaging(opaque_t*);
void send_message(opaque_t*, message_t msg);
//
// user code to use the interface
//
void* update_location(void* ptr, message_t msg) {
auto* pmsg = static_cast<message_t*>(ptr);
*pmsg = msg; // update the reference and return it
return pmsg;
}
int main(int, char*[]) {
message_t target = 0;
auto session = start_messaging(&target, //
&update_location);
assert(session != nullptr);
for (auto m : {1u, 2u, 3u}) {
send_message(session, m); // the callback (update)
assert(target == m); // must have changed the memory location
}
stop_messaging(session);
return 0;
}
//
// The implementation uses coroutine
//
#include <coroutine/channel.hpp>
#include <coroutine/return.h>
using channel_t = coro::channel<message_t>;
#if defined(__GNUC__)
using no_return_t = coro::null_frame_t;
#else
using no_return_t = std::nullptr_t;
#endif
opaque_t* start_messaging(void* ctx, callback_t on_message) {
auto* ch = new (std::nothrow) channel_t{};
if (ch == nullptr)
return nullptr;
// attach a receiver coroutine to the channel
[](channel_t* ch, void* context, callback_t callback) -> no_return_t {
puts("start receiving ...");
while (ch) { // always true
auto [msg, ok] = co_await ch->read();
if (ok == false) {
puts("stopped ...");
co_return;
}
// received. invoke the callback
puts("received");
context = callback(context, msg);
}
}(ch, ctx, on_message);
return reinterpret_cast<opaque_t*>(ch);
}
void stop_messaging(opaque_t* ptr) {
auto ch = reinterpret_cast<channel_t*>(ptr);
delete ch;
}
void send_message(opaque_t* ptr, message_t m) {
// spawn a sender coroutine
[](channel_t* ch, message_t msg) mutable -> no_return_t {
// msg will be 'moved' to reader coroutine. so it must be `mutable`
if (co_await ch->write(msg) == false)
puts("can't send anymore"); // the channel is going to destruct ...
puts("sent");
}(reinterpret_cast<channel_t*>(ptr), std::move(m));
}