@@ -245,22 +245,23 @@ accept(Socket, Timeout) ->
245245 case ? MODULE :nif_select_read (Socket , Ref ) of
246246 ok ->
247247 receive
248- {select , _AcceptedSocket , Ref , ready_input } ->
248+ {'$socket' , Socket , select , Ref } ->
249249 case ? MODULE :nif_accept (Socket ) of
250250 {error , closed } = E ->
251251 ? MODULE :nif_select_stop (Socket ),
252252 E ;
253253 R ->
254254 R
255255 end ;
256- {closed , Ref } ->
256+ {'$socket' , Socket , abort , { Ref , closed } } ->
257257 % socket was closed by another process
258258 % TODO: we need to handle:
259259 % (a) SELECT_STOP being scheduled
260- % (b) flush of messages as we can have both
261- % {closed, Ref} and {select, _, Ref, _} in the
260+ % (b) flush of messages as we can have both in the
262261 % queue
263- {error , closed }
262+ {error , closed };
263+ Other ->
264+ {error , {accept , unexpected , Other , {'$socket' , Socket , select , Ref }}}
264265 after Timeout ->
265266 {error , timeout }
266267 end ;
@@ -299,25 +300,60 @@ recv(Socket, Length) ->
299300% % `{ok, Data} = socket:recv(ConnectedSocket)'
300301% % @end
301302% %-----------------------------------------------------------------------------
302- -spec recv (Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout ()) ->
303- {ok , Data :: binary ()} | {error , Reason :: term ()}.
303+ -spec recv (
304+ Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout () | nowait | reference ()
305+ ) ->
306+ {ok , Data :: binary ()}
307+ | {select , {select_info , recvfrom , reference ()}}
308+ | {select , {{select_info , recvfrom , reference ()}, Data :: binary ()}}
309+ | {error , Reason :: term ()}.
310+ recv (Socket , Length , 0 ) ->
311+ recv0_noselect (Socket , Length );
312+ recv (Socket , 0 , Timeout ) when is_integer (Timeout ) orelse Timeout =:= infinity ->
313+ recv0 (Socket , 0 , Timeout );
314+ recv (Socket , Length , nowait ) ->
315+ recv0_nowait (Socket , Length , erlang :make_ref ());
316+ recv (Socket , Length , Ref ) when is_reference (Ref ) ->
317+ recv0_nowait (Socket , Length , Ref );
304318recv (Socket , Length , Timeout ) ->
319+ case ? MODULE :getopt (Socket , {socket , type }) of
320+ {ok , stream } when Timeout =/= infinity ->
321+ recv0_r (Socket , Length , Timeout , erlang :system_time (millisecond ) + Timeout , []);
322+ {ok , stream } when Timeout =:= infinity ->
323+ recv0_r (Socket , Length , Timeout , undefined , []);
324+ _ ->
325+ recv0 (Socket , Length , Timeout )
326+ end .
327+
328+ recv0_noselect (Socket , Length ) ->
329+ case ? MODULE :nif_recv (Socket , Length ) of
330+ {error , _ } = E ->
331+ E ;
332+ {ok , Data } when Length =:= 0 orelse byte_size (Data ) =:= Length ->
333+ {ok , Data };
334+ {ok , Data } ->
335+ case ? MODULE :getopt (Socket , {socket , type }) of
336+ {ok , stream } ->
337+ {error , {timeout , Data }};
338+ {ok , dgram } ->
339+ {ok , Data }
340+ end
341+ end .
342+
343+ recv0 (Socket , Length , Timeout ) ->
305344 Ref = erlang :make_ref (),
306- ? TRACE (" select read for recv. self=~p ref=~p~n " , [self (), Ref ]),
307345 case ? MODULE :nif_select_read (Socket , Ref ) of
308346 ok ->
309347 receive
310- {select , _AcceptedSocket , Ref , ready_input } ->
348+ {'$socket' , Socket , select , Ref } ->
311349 case ? MODULE :nif_recv (Socket , Length ) of
312350 {error , _ } = E ->
313351 ? MODULE :nif_select_stop (Socket ),
314352 E ;
315- % TODO: Assemble data to have more if Length > byte_size(Data)
316- % as long as timeout did not expire
317353 {ok , Data } ->
318354 {ok , Data }
319355 end ;
320- {closed , Ref } ->
356+ {'$socket' , Socket , abort , { Ref , closed } } ->
321357 % socket was closed by another process
322358 % TODO: see above in accept/2
323359 {error , closed }
@@ -328,6 +364,72 @@ recv(Socket, Length, Timeout) ->
328364 Error
329365 end .
330366
367+ recv0_nowait (Socket , Length , Ref ) ->
368+ case ? MODULE :nif_recv (Socket , Length ) of
369+ {error , timeout } ->
370+ case ? MODULE :nif_select_read (Socket , Ref ) of
371+ ok ->
372+ {select , {select_info , recv , Ref }};
373+ {error , _ } = Error1 ->
374+ Error1
375+ end ;
376+ {error , _ } = E ->
377+ E ;
378+ {ok , Data } when byte_size (Data ) < Length ->
379+ case ? MODULE :getopt (Socket , {socket , type }) of
380+ {ok , stream } ->
381+ case ? MODULE :nif_select_read (Socket , Ref ) of
382+ ok ->
383+ {select , {{select_info , recv , Ref }, Data }};
384+ {error , _ } = Error1 ->
385+ Error1
386+ end ;
387+ {ok , dgram } ->
388+ {ok , Data }
389+ end ;
390+ {ok , Data } ->
391+ {ok , Data }
392+ end .
393+
394+ recv0_r (Socket , Length , Timeout , EndQuery , Acc ) ->
395+ Ref = erlang :make_ref (),
396+ case ? MODULE :nif_select_read (Socket , Ref ) of
397+ ok ->
398+ receive
399+ {'$socket' , Socket , select , Ref } ->
400+ case ? MODULE :nif_recv (Socket , Length ) of
401+ {error , _ } = E ->
402+ ? MODULE :nif_select_stop (Socket ),
403+ E ;
404+ {ok , Data } ->
405+ NewAcc = [Data | Acc ],
406+ Remaining = Length - byte_size (Data ),
407+ case Remaining of
408+ 0 ->
409+ {ok , list_to_binary (lists :reverse (NewAcc ))};
410+ _ ->
411+ NewTimeout =
412+ case Timeout of
413+ infinity -> infinity ;
414+ _ -> EndQuery - erlang :system_time (millisecond )
415+ end ,
416+ recv0_r (Socket , Remaining , NewTimeout , EndQuery , NewAcc )
417+ end
418+ end ;
419+ {'$socket' , Socket , abort , {Ref , closed }} ->
420+ % socket was closed by another process
421+ % TODO: see above in accept/2
422+ {error , closed }
423+ after Timeout ->
424+ case Acc of
425+ [] -> {error , timeout };
426+ _ -> {error , {timeout , list_to_binary (lists :reverse (Acc ))}}
427+ end
428+ end ;
429+ {error , _Reason } = Error ->
430+ Error
431+ end .
432+
331433% %-----------------------------------------------------------------------------
332434% % @equiv socket:recvfrom(Socket, 0)
333435% % @end
@@ -370,25 +472,43 @@ recvfrom(Socket, Length) ->
370472% % bytes are available and return these bytes.
371473% % @end
372474% %-----------------------------------------------------------------------------
373- -spec recvfrom (Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout ()) ->
374- {ok , {Address :: sockaddr (), Data :: binary ()}} | {error , Reason :: term ()}.
475+ -spec recvfrom (
476+ Socket :: socket (), Length :: non_neg_integer (), Timeout :: timeout () | nowait | reference ()
477+ ) ->
478+ {ok , {Address :: sockaddr (), Data :: binary ()}}
479+ | {select , {select_info , recvfrom , reference ()}}
480+ | {error , Reason :: term ()}.
481+ recvfrom (Socket , Length , 0 ) ->
482+ recvfrom0_noselect (Socket , Length );
483+ recvfrom (Socket , Length , nowait ) ->
484+ recvfrom0_nowait (Socket , Length , erlang :make_ref ());
485+ recvfrom (Socket , Length , Ref ) when is_reference (Ref ) ->
486+ recvfrom0_nowait (Socket , Length , Ref );
375487recvfrom (Socket , Length , Timeout ) ->
488+ recvfrom0 (Socket , Length , Timeout ).
489+
490+ recvfrom0_noselect (Socket , Length ) ->
491+ case ? MODULE :nif_recvfrom (Socket , Length ) of
492+ {error , _ } = E ->
493+ E ;
494+ {ok , {_Address , _Data }} = Reply ->
495+ Reply
496+ end .
497+
498+ recvfrom0 (Socket , Length , Timeout ) ->
376499 Ref = erlang :make_ref (),
377- ? TRACE (" select read for recvfrom. self=~p ref=~p " , [self (), Ref ]),
378500 case ? MODULE :nif_select_read (Socket , Ref ) of
379501 ok ->
380502 receive
381- {select , _AcceptedSocket , Ref , ready_input } ->
503+ {'$socket' , Socket , select , Ref } ->
382504 case ? MODULE :nif_recvfrom (Socket , Length ) of
383505 {error , _ } = E ->
384506 ? MODULE :nif_select_stop (Socket ),
385507 E ;
386- % TODO: Assemble data to have more if Length > byte_size(Data)
387- % as long as timeout did not expire
388- {ok , {Address , Data }} ->
389- {ok , {Address , Data }}
508+ {ok , {_Address , _Data }} = Reply ->
509+ Reply
390510 end ;
391- {closed , Ref } ->
511+ {'$socket' , Socket , abort , { Ref , closed } } ->
392512 % socket was closed by another process
393513 % TODO: see above in accept/2
394514 {error , closed }
@@ -399,6 +519,21 @@ recvfrom(Socket, Length, Timeout) ->
399519 Error
400520 end .
401521
522+ recvfrom0_nowait (Socket , Length , Ref ) ->
523+ case ? MODULE :nif_recvfrom (Socket , Length ) of
524+ {error , timeout } ->
525+ case ? MODULE :nif_select_read (Socket , Ref ) of
526+ ok ->
527+ {select , {select_info , recvfrom , Ref }};
528+ {error , _ } = SelectError ->
529+ SelectError
530+ end ;
531+ {error , _ } = RecvError ->
532+ RecvError ;
533+ {ok , {_Address , _Data }} = Reply ->
534+ Reply
535+ end .
536+
402537% %-----------------------------------------------------------------------------
403538% % @param Socket the socket
404539% % @param Data the data to send
0 commit comments