Skip to content

Commit 7b34aa6

Browse files
committed
Accept callback in accept
1 parent 507da3d commit 7b34aa6

File tree

3 files changed

+62
-27
lines changed

3 files changed

+62
-27
lines changed

libraries/common/io/network.effekt

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ type Listener = Int
2727
def listen(host: String, port: Int, backlog: Int): Listener / Exception[IOError] =
2828
internal::checkResult(internal::listen(host, port, backlog))
2929

30-
def accept(listener: Listener): Connection / Exception[IOError] =
31-
internal::checkResult(internal::accept(listener))
30+
def accept(listener: Listener, handler: Connection => Unit at {io, async, global}): Unit / Exception[IOError] = {
31+
internal::checkResult(internal::accept(listener, handler)); ()
32+
}
3233

33-
def close_listener(listener: Listener): Unit / Exception[IOError] = {
34-
internal::checkResult(internal::close_listener(listener)); ()
34+
def shutdown(listener: Listener): Unit / Exception[IOError] = {
35+
internal::checkResult(internal::shutdown(listener)); ()
3536
}
3637

3738
namespace internal {
@@ -42,8 +43,8 @@ namespace internal {
4243
declare void @c_tcp_write(%Int, %Pos, %Int, %Int, %Stack)
4344
declare void @c_tcp_close(%Int, %Stack)
4445
declare void @c_tcp_listen(%Pos, %Int, %Int, %Stack)
45-
declare void @c_tcp_accept(%Int, %Stack)
46-
declare void @c_tcp_close_listener(%Int, %Stack)
46+
declare void @c_tcp_accept(%Int, %Pos, %Stack)
47+
declare void @c_tcp_shutdown(%Int, %Stack)
4748
"""
4849

4950
extern async def connect(host: String, port: Int): Int =
@@ -76,15 +77,15 @@ namespace internal {
7677
ret void
7778
"""
7879

79-
extern async def accept(listener: Int): Int =
80+
extern async def accept(listener: Int, handler: Int => Unit at {io, async, global}): Int =
8081
llvm """
81-
call void @c_tcp_accept(%Int ${listener}, %Stack %stack)
82+
call void @c_tcp_accept(%Int ${listener}, %Pos ${handler}, %Stack %stack)
8283
ret void
8384
"""
8485

85-
extern async def close_listener(handle: Int): Int =
86+
extern async def shutdown(handle: Int): Int =
8687
llvm """
87-
call void @c_tcp_close_listener(%Int ${handle}, %Stack %stack)
88+
call void @c_tcp_shutdown(%Int ${handle}, %Stack %stack)
8889
ret void
8990
"""
9091

libraries/llvm/io.c

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,12 @@ void c_tcp_close(Int handle, Stack stack) {
315315
uv_close(uv_handle, c_tcp_close_cb);
316316
}
317317

318+
319+
typedef struct {
320+
Stack stack;
321+
struct Pos handler;
322+
} tcp_accept_closure_t;
323+
318324
void c_tcp_listen(String host, Int port, Int backlog, Stack stack) {
319325
// TODO make non-async
320326
char* host_str = c_bytearray_into_nullterminated_string(host);
@@ -358,57 +364,82 @@ void c_tcp_listen(String host, Int port, Int backlog, Stack stack) {
358364
}
359365

360366
void c_tcp_accept_cb(uv_stream_t* server, int status) {
361-
Stack stack = (Stack)server->data;
367+
tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)server->data;
362368

363369
if (status < 0) {
364-
resume_Int(stack, status);
370+
// TODO resume last
371+
erasePositive(accept_closure->handler);
372+
resume_Int(accept_closure->stack, status);
373+
free(accept_closure);
374+
server->data = NULL;
365375
return;
366376
}
367377

368378
uv_tcp_t* client = malloc(sizeof(uv_tcp_t));
369379
int result = uv_tcp_init(uv_default_loop(), client);
370380

371381
if (result < 0) {
382+
// TODO resume last
372383
free(client);
373-
// TODO share stack?
374-
resume_Int(stack, result);
384+
erasePositive(accept_closure->handler);
385+
resume_Int(accept_closure->stack, result);
386+
free(accept_closure);
387+
server->data = NULL;
375388
return;
376389
}
377390

378391
result = uv_accept(server, (uv_stream_t*)client);
379392
if (result < 0) {
393+
// TODO resume last
380394
uv_close((uv_handle_t*)client, NULL);
381395
free(client);
382-
// TODO share stack?
383-
resume_Int(stack, result);
396+
erasePositive(accept_closure->handler);
397+
resume_Int(accept_closure->stack, result);
398+
free(accept_closure);
399+
server->data = NULL;
384400
return;
385401
}
386402

387-
// TODO share resumption
388-
// shareStack(stack);
389-
resume_Int(stack, (int64_t)client);
403+
// Call the handler with the new client connection
404+
run_Int(unbox(accept_closure->handler), (int64_t)client);
390405
}
391406

392-
void c_tcp_accept(Int server_handle, Stack stack) {
393-
uv_stream_t* server = (uv_stream_t*)server_handle;
394-
server->data = stack; // Store stack for multiple resumes
407+
void c_tcp_accept(Int listener, struct Pos handler, Stack stack) {
408+
uv_stream_t* server = (uv_stream_t*)listener;
409+
410+
tcp_accept_closure_t* accept_closure = malloc(sizeof(tcp_accept_closure_t));
411+
accept_closure->stack = stack;
412+
accept_closure->handler = handler;
413+
server->data = accept_closure;
395414

396415
int result = uv_listen(server, 0, c_tcp_accept_cb);
397416
if (result < 0) {
417+
free(accept_closure);
418+
erasePositive(handler);
398419
resume_Int(stack, result);
420+
return;
399421
}
400422
}
401423

402-
void c_tcp_close_listener(Int handle, Stack stack) {
424+
void c_tcp_shutdown_cb(uv_handle_t* handle) {
425+
Stack stack = (Stack)handle->data;
426+
free(handle);
427+
// TODO resume_Pos Unit
428+
resume_Int(stack, 0);
429+
}
430+
431+
void c_tcp_shutdown(Int handle, Stack stack) {
403432
uv_handle_t* uv_handle = (uv_handle_t*)handle;
404433

405-
Stack registered_stack = (Stack)uv_handle->data;
406-
if (registered_stack) {
407-
eraseStack(registered_stack);
434+
tcp_accept_closure_t* accept_closure = (tcp_accept_closure_t*)uv_handle->data;
435+
if (accept_closure) {
436+
eraseStack(accept_closure->stack);
437+
erasePositive(accept_closure->handler);
438+
free(accept_closure);
408439
}
409440

410441
uv_handle->data = stack;
411-
uv_close(uv_handle, c_tcp_close_cb);
442+
uv_close(uv_handle, c_tcp_shutdown_cb);
412443
}
413444

414445

libraries/llvm/types.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ typedef struct StackValue* Stack;
3434

3535
// Defined in rts.ll
3636

37+
extern struct Pos box(struct Neg);
38+
extern struct Neg unbox(struct Pos);
39+
3740
extern void resume_Int(Stack, Int);
3841
extern void resume_Pos(Stack, struct Pos);
3942

0 commit comments

Comments
 (0)