This repository was archived by the owner on Oct 6, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathunix_kqueue_single_thread.cpp
More file actions
88 lines (78 loc) · 2.4 KB
/
unix_kqueue_single_thread.cpp
File metadata and controls
88 lines (78 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* @author github.com/luncliff (luncliff@gmail.com)
*/
#undef NDEBUG
#include <cassert>
#include <iostream>
#include <string>
#include <string_view>
#include <coroutine/return.h>
#include <coroutine/unix.h>
#include <unistd.h>
using namespace coro;
using namespace std;
auto send_async(kqueue_owner& kq, uint64_t fd) -> frame_t {
kevent64_s req{
.ident = fd,
.filter = EVFILT_WRITE,
.flags = EV_ADD | EV_ENABLE | EV_ONESHOT,
};
co_await kq.submit(req);
auto message = "hello"sv;
auto sz = write(fd, message.data(), message.length());
if (sz < 0)
throw system_error{errno, system_category(), "write"};
}
auto recv_async(kqueue_owner& kq, uint64_t fd, std::string& message)
-> frame_t {
kevent64_s req{
.ident = fd,
.filter = EVFILT_READ,
.flags = EV_ADD | EV_ENABLE | EV_ONESHOT,
};
co_await kq.submit(req);
array<char, 64> buf{};
auto sz = read(fd, buf.data(), buf.size());
if (sz < 0)
throw system_error{errno, system_category(), "read"};
message = string_view{buf.data(), static_cast<size_t>(sz)};
}
auto open_pipe() -> std::tuple<uint64_t, uint64_t> {
int fds[2]{};
if (pipe(fds) < 0)
throw system_error{errno, system_category(), "pipe"};
return std::make_tuple(static_cast<uint64_t>(fds[0]),
static_cast<uint64_t>(fds[1]));
}
int main(int, char*[]) {
uint64_t rfd, wfd;
tie(rfd, wfd) = open_pipe();
auto on_return_1 = gsl::finally([=]() {
close(rfd);
close(wfd);
});
kqueue_owner kq{};
// spawn some coroutines ... notice that we are reading before write
string message{};
auto reader = recv_async(kq, rfd, message);
auto writer = send_async(kq, wfd);
auto on_return_2 = gsl::finally([&reader, &writer]() {
reader.destroy();
writer.destroy();
});
// poll the kqueue ...
array<kevent64_s, 2> events{};
auto num_work = events.size();
while (num_work) {
auto b = events.data();
auto e = b + kq.events(timespec{.tv_sec = 1}, events);
for_each(b, e, [&num_work](kevent64_s& event) {
auto coro = coroutine_handle<void>::from_address(
reinterpret_cast<void*>(event.udata));
coro.resume();
--num_work;
});
}
// expected message ?
return message != "hello" ? EXIT_FAILURE : EXIT_SUCCESS;
}