Skip to content

Commit a4c7c9f

Browse files
committed
Implement coroutine_wake_up()
1 parent 1a5187b commit a4c7c9f

File tree

4 files changed

+62
-25
lines changed

4 files changed

+62
-25
lines changed

coroutine.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
(da)->items[j] = (da)->items[--(da)->count]; \
3838
} while(0)
3939

40+
#define UNUSED(x) (void)(x)
4041
#define TODO(message) do { fprintf(stderr, "%s:%d: TODO: %s\n", __FILE__, __LINE__, message); abort(); } while(0)
4142
#define UNREACHABLE(message) do { fprintf(stderr, "%s:%d: UNREACHABLE: %s\n", __FILE__, __LINE__, message); abort(); } while(0)
4243

@@ -285,3 +286,16 @@ size_t coroutine_alive(void)
285286
{
286287
return active.count;
287288
}
289+
290+
void coroutine_wake_up(size_t id)
291+
{
292+
// @speed coroutine_wake_up is linear
293+
for (size_t i = 0; i < asleep.count; ++i) {
294+
if (asleep.items[i] == id) {
295+
da_remove_unordered(&asleep, id);
296+
da_remove_unordered(&polls, id);
297+
da_append(&active, id);
298+
return;
299+
}
300+
}
301+
}

coroutine.c3

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import std::net;
44

55
def CoroutineFn = fn void(void*);
66

7-
extern fn int sleep_read(int fd) @extern("coroutine_sleep_read");
8-
extern fn int sleep_write(int fd) @extern("coroutine_sleep_write");
7+
extern fn void sleep_read(int fd) @extern("coroutine_sleep_read");
8+
extern fn void sleep_write(int fd) @extern("coroutine_sleep_write");
9+
extern fn void wake_up(usz id) @extern("coroutine_wake_up");
910
extern fn void init() @extern("coroutine_init");
1011
extern fn void finish() @extern("coroutine_finish");
1112
extern fn void yield() @extern("coroutine_yield");

coroutine.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,22 @@ size_t coroutine_id(void);
3434
// to wait until all the "child" coroutines have died.
3535
size_t coroutine_alive(void);
3636

37-
// Put the current coroutine to sleep until the non-blocking socket `fd` has avaliable data to read.
38-
// Trying to read from fd after coroutine_sleep_read() should not cause EAGAIN.
37+
// Put the current coroutine to sleep until the non-blocking socket `fd` has
38+
// avaliable data to read. Trying to read from fd after coroutine_sleep_read()
39+
// may still cause EAGAIN, if the coroutine was woken up by coroutine_wake_up
40+
// before the socket became available for reading.
3941
void coroutine_sleep_read(int fd);
4042

41-
// Put the current coroutine to sleep until the non-blocking socket `fd` is ready to accept data to write.
42-
// Trying to write to fd after coroutine_sleep_write() should not cause EAGAIN.
43+
// Put the current coroutine to sleep until the non-blocking socket `fd` is
44+
// ready to accept data to write. Trying to write to fd after
45+
// coroutine_sleep_write() may still cause EAGAIN, if the coroutine was woken up
46+
// by coroutine_wake_up before the socket became available for writing.
4347
void coroutine_sleep_write(int fd);
4448

49+
// Wake up coroutine by id if it is currently sleeping due to
50+
// coroutine_sleep_read() or coroutine_sleep_write() calls.
51+
void coroutine_wake_up(size_t id);
52+
4553
// TODO: implement sleeping by timeout
4654
// TODO: add timeouts to coroutine_sleep_read() and coroutine_sleep_write()
4755

examples/echo.c3

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,51 +4,65 @@ import std::io;
44
import std::net;
55
import coroutine;
66

7+
bool quit = false;
8+
usz server_id = 0;
9+
710
fn void main() {
811
coroutine::init();
12+
defer coroutine::finish();
13+
14+
server_id = coroutine::id();
915

1016
const String HOST = "localhost";
1117
const uint PORT = 6969;
12-
TcpServerSocket server_sock = tcp::listen(HOST, PORT, 69, REUSEADDR)!!;
13-
server_sock.sock.set_non_blocking(true)!!;
14-
15-
io::printfn("[%d] Listening to %s:%d", coroutine::id(), HOST, PORT);
16-
while (true) {
17-
coroutine::sleep_read(server_sock.sock);
18-
TcpSocket client_sock = tcp::accept(&server_sock)!!;
19-
client_sock.sock.set_non_blocking(true)!!;
18+
TcpServerSocket server = tcp::listen(HOST, PORT, 69, REUSEADDR)!!;
19+
server.sock.set_non_blocking(true)!!;
20+
21+
io::printfn("[%d] Server listening to %s:%d", coroutine::id(), HOST, PORT);
22+
while SERVER: (true) {
23+
coroutine::sleep_read(server.sock);
24+
if (quit) break SERVER;
25+
TcpSocket client = tcp::accept(&server)!!;
26+
client.sock.set_non_blocking(true)!!;
2027
coroutine::go(fn void(void *arg) {
2128
io::printfn("[%d] Client connected!", coroutine::id());
2229

23-
TcpSocket* client_sock = (TcpSocket*)arg;
30+
TcpSocket* client = (TcpSocket*)arg;
2431
char[] buf = mem::new_array(char, 1024);
2532
defer {
26-
client_sock.close()!!;
27-
free(client_sock);
33+
client.close()!!;
34+
free(client);
2835
free(buf.ptr);
2936
}
3037

3138
while OUTER: (true) {
32-
coroutine::sleep_read(client_sock.sock);
33-
usz n = client_sock.read(buf)!!;
39+
coroutine::sleep_read(client.sock);
40+
usz n = client.read(buf)!!;
3441
if (n == 0) break OUTER;
3542
char[] chunk = buf[0:n];
3643

37-
if (((String)chunk).trim() == "quit") {
38-
io::printfn("[%d] Client requested to quit", coroutine::id(), chunk.len);
39-
return;
44+
switch (((String)chunk).trim()) {
45+
case "quit":
46+
io::printfn("[%d] Client requested to quit", coroutine::id(), chunk.len);
47+
return;
48+
case "shutdown":
49+
io::printfn("[%d] Client requested to shutdown the server", coroutine::id());
50+
quit = true;
51+
coroutine::wake_up(server_id);
52+
return;
4053
}
4154

4255
io::printfn("[%d] Client sent %d bytes", coroutine::id(), chunk.len);
4356

4457
while (chunk.len > 0) {
45-
coroutine::sleep_write(client_sock.sock);
46-
usz m = client_sock.write(chunk)!!;
58+
coroutine::sleep_write(client.sock);
59+
usz m = client.write(chunk)!!;
4760
if (m == 0) break OUTER;
4861
chunk = chunk[m..];
4962
}
5063
}
5164
io::printfn("[%d] Client disconnected", coroutine::id());
52-
}, @clone(client_sock));
65+
}, @clone(client));
5366
}
67+
io::printfn("[%d] Server has been shutdown", coroutine::id());
5468
}

0 commit comments

Comments
 (0)