Skip to content

Commit dbafeb8

Browse files
committed
Add support for asynchronous socket:accept/2 API
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent e8b18f7 commit dbafeb8

File tree

4 files changed

+113
-18
lines changed

4 files changed

+113
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
2525
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
2626
- Added `erl_epmd` client implementation to epmd using `socket` module
27-
- Added support for socket asynchronous API for `recv` and `recvfrom`.
27+
- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`.
2828

2929
### Changed
3030

libs/estdlib/src/socket.erl

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -229,46 +229,77 @@ accept(Socket) ->
229229
%% be set to listen for connections.
230230
%%
231231
%% Note that this function will block until a connection is made
232-
%% from a client. Typically, users will spawn a call to `accept'
233-
%% in a separate process.
232+
%% from a client, unless `nowait' or a reference is passed as `Timeout'.
233+
%% Typically, users will spawn a call to `accept' in a separate process.
234234
%%
235235
%% Example:
236236
%%
237237
%% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)'
238238
%% @end
239239
%%-----------------------------------------------------------------------------
240-
-spec accept(Socket :: socket(), Timeout :: timeout()) ->
241-
{ok, Connection :: socket()} | {error, Reason :: term()}.
240+
-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) ->
241+
{ok, Connection :: socket()}
242+
| {select, {select_info, accept, reference()}}
243+
| {error, Reason :: term()}.
244+
accept(Socket, 0) ->
245+
accept0_noselect(Socket);
246+
accept(Socket, nowait) ->
247+
accept0_nowait(Socket, erlang:make_ref());
248+
accept(Socket, Ref) when is_reference(Ref) ->
249+
accept0_nowait(Socket, Ref);
242250
accept(Socket, Timeout) ->
251+
accept0(Socket, Timeout).
252+
253+
accept0_noselect(Socket) ->
254+
case ?MODULE:nif_accept(Socket) of
255+
{error, _} = E ->
256+
E;
257+
{ok, _Socket} = Reply ->
258+
Reply
259+
end.
260+
261+
accept0(Socket, Timeout) ->
243262
Ref = erlang:make_ref(),
244-
?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]),
245263
case ?MODULE:nif_select_read(Socket, Ref) of
246264
ok ->
247265
receive
248266
{'$socket', Socket, select, Ref} ->
249267
case ?MODULE:nif_accept(Socket) of
250-
{error, closed} = E ->
268+
{error, _} = E ->
251269
?MODULE:nif_select_stop(Socket),
252270
E;
253-
R ->
254-
R
271+
{ok, _Socket} = Reply ->
272+
Reply
255273
end;
256274
{'$socket', Socket, abort, {Ref, closed}} ->
257275
% socket was closed by another process
258276
% TODO: we need to handle:
259277
% (a) SELECT_STOP being scheduled
260278
% (b) flush of messages as we can have both in the
261279
% queue
262-
{error, closed};
263-
Other ->
264-
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
280+
{error, closed}
265281
after Timeout ->
266282
{error, timeout}
267283
end;
268284
{error, _Reason} = Error ->
269285
Error
270286
end.
271287

288+
accept0_nowait(Socket, Ref) ->
289+
case ?MODULE:nif_accept(Socket) of
290+
{error, eagain} ->
291+
case ?MODULE:nif_select_read(Socket, Ref) of
292+
ok ->
293+
{select, {select_info, accept, Ref}};
294+
{error, _} = SelectError ->
295+
SelectError
296+
end;
297+
{error, _} = RecvError ->
298+
RecvError;
299+
{ok, _Socket} = Reply ->
300+
Reply
301+
end.
302+
272303
%%-----------------------------------------------------------------------------
273304
%% @equiv socket:recv(Socket, 0)
274305
%% @end

src/libAtomVM/otp_socket.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1743,8 +1743,10 @@ static term nif_socket_accept(Context *ctx, int argc, term argv[])
17431743
int fd = accept(rsrc_obj->fd, (struct sockaddr *) &clientaddr, &clientlen);
17441744
SMP_RWLOCK_UNLOCK(rsrc_obj->socket_lock);
17451745
if (UNLIKELY(fd == -1 || fd == CLOSED_FD)) {
1746-
AVM_LOGE(TAG, "Unable to accept on socket %i.", rsrc_obj->fd);
17471746
int err = errno;
1747+
if (err != EAGAIN) {
1748+
AVM_LOGI(TAG, "Unable to accept on socket %i. errno=%i", rsrc_obj->fd, (int) err);
1749+
}
17481750
term reason = (err == ECONNABORTED) ? CLOSED_ATOM : posix_errno_to_term(err, global);
17491751
return make_error_tuple(reason, ctx);
17501752
} else {

tests/libs/estdlib/test_tcp_socket.erl

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ test() ->
2828
ok = test_close_by_another_process(),
2929
ok = test_buf_size(),
3030
ok = test_timeout(),
31-
ok = test_nowait(),
31+
ok = test_recv_nowait(),
32+
ok = test_accept_nowait(),
3233
ok = test_setopt_getopt(),
3334
case get_otp_version() of
3435
atomvm ->
@@ -430,12 +431,12 @@ test_timeout() ->
430431
ok = close_client_socket(Socket),
431432
ok = close_listen_socket(ListenSocket).
432433

433-
test_nowait() ->
434-
ok = test_nowait(fun receive_loop_nowait/2),
435-
ok = test_nowait(fun receive_loop_nowait_ref/2),
434+
test_recv_nowait() ->
435+
ok = test_recv_nowait(fun receive_loop_nowait/2),
436+
ok = test_recv_nowait(fun receive_loop_nowait_ref/2),
436437
ok.
437438

438-
test_nowait(ReceiveFun) ->
439+
test_recv_nowait(ReceiveFun) ->
439440
etest:flush_msg_queue(),
440441

441442
Port = 44404,
@@ -460,6 +461,67 @@ test_nowait(ReceiveFun) ->
460461

461462
ok = close_listen_socket(ListenSocket).
462463

464+
test_accept_nowait() ->
465+
OTPVersion = get_otp_version(),
466+
ok = test_accept_nowait(nowait, OTPVersion),
467+
ok = test_accept_nowait(make_ref(), OTPVersion),
468+
ok.
469+
470+
% actually since 22.1, but let's simplify here.
471+
test_accept_nowait(_NoWaitRef, Version) when Version =/= atomvm andalso Version < 23 -> ok;
472+
test_accept_nowait(Ref, Version) when
473+
is_reference(Ref) andalso Version =/= atomvm andalso Version < 24
474+
->
475+
ok;
476+
test_accept_nowait(NoWaitRef, _Version) ->
477+
etest:flush_msg_queue(),
478+
479+
Port = 44404,
480+
{ok, Socket} = socket:open(inet, stream, tcp),
481+
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
482+
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
483+
484+
ok = socket:bind(Socket, #{
485+
family => inet, addr => loopback, port => Port
486+
}),
487+
488+
ok = socket:listen(Socket),
489+
490+
Parent = self(),
491+
{Child, MonitorRef} = spawn_opt(
492+
fun() ->
493+
{select, {select_info, accept, Ref}} = socket:accept(Socket, NoWaitRef),
494+
Parent ! {self(), got_nowait},
495+
receive
496+
{'$socket', Socket, select, Ref} ->
497+
{ok, ConnSocket} = socket:accept(Socket, 0),
498+
socket:send(ConnSocket, <<"hello">>),
499+
socket:close(ConnSocket)
500+
after 5000 ->
501+
exit(timeout)
502+
end
503+
end,
504+
[link, monitor]
505+
),
506+
ok =
507+
receive
508+
{Child, got_nowait} -> ok
509+
after 5000 -> timeout
510+
end,
511+
{ok, ClientSocket} = socket:open(inet, stream, tcp),
512+
ok = socket:connect(ClientSocket, #{family => inet, addr => loopback, port => Port}),
513+
{ok, <<"hello">>} = socket:recv(ClientSocket, 5),
514+
515+
socket:close(ClientSocket),
516+
ok =
517+
receive
518+
{'DOWN', MonitorRef, process, Child, normal} -> ok
519+
after 5000 ->
520+
timeout
521+
end,
522+
socket:close(Socket),
523+
ok.
524+
463525
test_setopt_getopt() ->
464526
{ok, Socket} = socket:open(inet, stream, tcp),
465527
{ok, stream} = socket:getopt(Socket, {socket, type}),

0 commit comments

Comments
 (0)