Skip to content

Commit e4d3c20

Browse files
committed
Add support for asynchronous socket:accept/2 API
Signed-off-by: Paul Guyot <pguyot@kallisys.net>
1 parent 7d155b8 commit e4d3c20

File tree

3 files changed

+98
-17
lines changed

3 files changed

+98
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2323
- Added `supervisor:terminate_child/2`, `supervisor:restart_child/2` and `supervisor:delete_child/2`
2424
- Added `atomvm:subprocess/4` to perform pipe/fork/execve on POSIX platforms
2525
- Added `externalterm_to_term_with_roots` to efficiently preserve roots when allocating memory for external terms.
26-
- Added support for socket asynchronous API for `recv` and `recvfrom`.
26+
- Added support for socket asynchronous API for `recv`, `recvfrom` and `accept`.
2727

2828
### Changed
2929

libs/estdlib/src/socket.erl

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -229,46 +229,78 @@ accept(Socket) ->
229229
%% be set to listen for connections.
230230
%%
231231
%% Note that this function will block until a connection is made
232-
%% from a client. Typically, users will spawn a call to `accept'
233-
%% in a separate process.
232+
%% from a client, unless `nowait' or a reference is passed as `Timeout'.
233+
%% Typically, users will spawn a call to `accept' in a separate process.
234234
%%
235235
%% Example:
236236
%%
237237
%% `{ok, ConnectedSocket} = socket:accept(ListeningSocket)'
238238
%% @end
239239
%%-----------------------------------------------------------------------------
240-
-spec accept(Socket :: socket(), Timeout :: timeout()) ->
241-
{ok, Connection :: socket()} | {error, Reason :: term()}.
240+
-spec accept(Socket :: socket(), Timeout :: timeout() | nowait | reference()) ->
241+
{ok, Connection :: socket()}
242+
| {select, {select_info, accept, reference()}}
243+
| {error, Reason :: term()}.
244+
accept(Socket, 0) ->
245+
accept0_noselect(Socket);
246+
accept(Socket, nowait) ->
247+
accept0_nowait(Socket, erlang:make_ref());
248+
accept(Socket, Ref) when is_reference(Ref) ->
249+
accept0_nowait(Socket, Ref);
242250
accept(Socket, Timeout) ->
251+
accept0(Socket, Timeout).
252+
253+
accept0_noselect(Socket) ->
254+
case ?MODULE:nif_accept(Socket) of
255+
{error, _} = E ->
256+
E;
257+
{ok, _Socket} = Reply ->
258+
Reply
259+
end.
260+
261+
accept0(Socket, Timeout) ->
243262
Ref = erlang:make_ref(),
244-
?TRACE("select read for accept. self=~p ref=~p~n", [self(), Ref]),
245263
case ?MODULE:nif_select_read(Socket, Ref) of
246264
ok ->
247265
receive
248266
{'$socket', Socket, select, Ref} ->
249267
case ?MODULE:nif_accept(Socket) of
250-
{error, closed} = E ->
268+
{error, _} = E ->
251269
?MODULE:nif_select_stop(Socket),
252270
E;
253-
R ->
254-
R
271+
{ok, _Socket} = Reply ->
272+
Reply
255273
end;
256274
{'$socket', Socket, abort, {Ref, closed}} ->
257275
% socket was closed by another process
258276
% TODO: we need to handle:
259277
% (a) SELECT_STOP being scheduled
260278
% (b) flush of messages as we can have both in the
261279
% queue
262-
{error, closed};
263-
Other ->
264-
{error, {accept, unexpected, Other, {'$socket', Socket, select, Ref}}}
280+
{error, closed}
265281
after Timeout ->
266282
{error, timeout}
267283
end;
268284
{error, _Reason} = Error ->
269285
Error
270286
end.
271287

288+
accept0_nowait(Socket, Ref) ->
289+
case ?MODULE:nif_accept(Socket) of
290+
{error, eagain} ->
291+
case ?MODULE:nif_select_read(Socket, Ref) of
292+
ok ->
293+
{select, {select_info, accept, Ref}};
294+
{error, _} = SelectError ->
295+
SelectError
296+
end;
297+
{error, _} = RecvError ->
298+
RecvError;
299+
{ok, _Socket} = Reply ->
300+
Reply
301+
end.
302+
303+
272304
%%-----------------------------------------------------------------------------
273305
%% @equiv socket:recv(Socket, 0)
274306
%% @end

tests/libs/estdlib/test_tcp_socket.erl

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ test() ->
2828
ok = test_close_by_another_process(),
2929
ok = test_buf_size(),
3030
ok = test_timeout(),
31-
ok = test_nowait(),
31+
ok = test_recv_nowait(),
32+
ok = test_accept_nowait(),
3233
ok = test_setopt_getopt(),
3334
case get_otp_version() of
3435
atomvm ->
@@ -430,12 +431,12 @@ test_timeout() ->
430431
ok = close_client_socket(Socket),
431432
ok = close_listen_socket(ListenSocket).
432433

433-
test_nowait() ->
434-
ok = test_nowait(fun receive_loop_nowait/2),
435-
ok = test_nowait(fun receive_loop_nowait_ref/2),
434+
test_recv_nowait() ->
435+
ok = test_recv_nowait(fun receive_loop_nowait/2),
436+
ok = test_recv_nowait(fun receive_loop_nowait_ref/2),
436437
ok.
437438

438-
test_nowait(ReceiveFun) ->
439+
test_recv_nowait(ReceiveFun) ->
439440
etest:flush_msg_queue(),
440441

441442
Port = 44404,
@@ -460,6 +461,54 @@ test_nowait(ReceiveFun) ->
460461

461462
ok = close_listen_socket(ListenSocket).
462463

464+
test_accept_nowait() ->
465+
ok = test_accept_nowait(nowait),
466+
ok = test_accept_nowait(make_ref()),
467+
ok.
468+
469+
test_accept_nowait(NoWaitRef) ->
470+
etest:flush_msg_queue(),
471+
472+
Port = 44404,
473+
{ok, Socket} = socket:open(inet, stream, tcp),
474+
ok = socket:setopt(Socket, {socket, reuseaddr}, true),
475+
ok = socket:setopt(Socket, {socket, linger}, #{onoff => true, linger => 0}),
476+
477+
ok = socket:bind(Socket, #{
478+
family => inet, addr => loopback, port => Port
479+
}),
480+
481+
ok = socket:listen(Socket),
482+
483+
{Child, MonitorRef} = spawn_opt(
484+
fun() ->
485+
{select, {select_info, accept, Ref}} = socket:accept(Socket, NoWaitRef),
486+
receive
487+
{'$socket', Socket, select, Ref} ->
488+
{ok, ConnSocket} = socket:accept(Socket, 0),
489+
socket:send(ConnSocket, <<"hello">>),
490+
socket:close(ConnSocket)
491+
after 5000 ->
492+
exit(timeout)
493+
end
494+
end,
495+
[link, monitor]
496+
),
497+
498+
{ok, ClientSocket} = socket:open(inet, stream, tcp),
499+
ok = socket:connect(ClientSocket, #{family => inet, addr => loopback, port => Port}),
500+
{ok, <<"hello">>} = socket:recv(ClientSocket, 5),
501+
502+
socket:close(ClientSocket),
503+
ok =
504+
receive
505+
{'DOWN', MonitorRef, process, Child, normal} -> ok
506+
after 5000 ->
507+
timeout
508+
end,
509+
socket:close(Socket),
510+
ok.
511+
463512
test_setopt_getopt() ->
464513
{ok, Socket} = socket:open(inet, stream, tcp),
465514
{ok, stream} = socket:getopt(Socket, {socket, type}),

0 commit comments

Comments
 (0)