@@ -26,7 +26,8 @@ defmodule Loqui.CowboyProtocol do
2626 encoding: nil ,
2727 compression: nil ,
2828 monitor_refs: % { } ,
29- ping_timeout_ref: nil
29+ pong_received: true ,
30+ next_seq: 1
3031
3132 def upgrade ( req , env , handler , handler_opts ) do
3233 :ranch . remove_connection ( env [ :listener ] )
@@ -42,6 +43,9 @@ defmodule Loqui.CowboyProtocol do
4243 worker_pool: Loqui . pool_name ,
4344 }
4445
46+ { host , _ } = :cowboy_req . host ( req )
47+ Logger . info "[loqui] upgrade. host=#{ inspect host } socket_pid=#{ inspect socket_pid } "
48+
4549 handler_init ( state )
4650 end
4751
@@ -62,25 +66,24 @@ defmodule Loqui.CowboyProtocol do
6266 0 -> :ok
6367 end
6468
65- refresh_ping_timeout ( state ) |> handler_loop ( << >> )
69+ ping ( state ) |> handler_loop ( << >> )
6670 end
6771
6872 @ spec handler_loop ( state , binary ) :: { :ok , req , env }
69- def handler_loop ( % { socket_pid: socket_pid , transport: transport , ping_timeout_ref: ping_timeout_ref } = state , so_far ) do
73+ def handler_loop ( % { socket_pid: socket_pid , transport: transport } = state , so_far ) do
7074 transport . setopts ( socket_pid , [ active: :once ] )
7175 receive do
76+ :send_ping ->
77+ ping ( state ) |> handler_loop ( so_far )
7278 { :response , seq , response } ->
73- handle_response ( state , seq , response , [ ] ) |> socket_data ( so_far )
79+ handle_response ( state , seq , response , [ ] ) |> handle_socket_data ( so_far )
7480 { :tcp , ^ socket_pid , data } ->
75- socket_data ( state , << so_far :: binary , data :: binary >> )
76- { :tcp_closed , ^ socket_pid } -> close ( state , :tcp_closed )
77- { :tcp_error , ^ socket_pid , reason } -> goaway ( state , reason )
78- :ping_timeout ->
79- if Process . read_timer ( ping_timeout_ref ) == false do
80- goaway ( state , :ping_timeout )
81- else
82- handler_loop ( state , so_far )
83- end
81+ handle_socket_data ( state , << so_far :: binary , data :: binary >> )
82+ { :tcp_closed , ^ socket_pid } ->
83+ Logger . info "[loqui] tcp_closed. socket_pid=#{ inspect socket_pid } "
84+ close ( state , :tcp_closed )
85+ { :tcp_error , ^ socket_pid , reason } ->
86+ goaway ( state , reason )
8487 { :DOWN , ref , :process , _pid , reason } ->
8588 handle_down ( state , ref , reason ) |> handler_loop ( so_far )
8689 other ->
@@ -89,6 +92,20 @@ defmodule Loqui.CowboyProtocol do
8992 end
9093 end
9194
95+ @ spec ping ( state ) :: state | { :ok , req , env }
96+ defp ping ( % { pong_recieved: false } = state ) , do: goaway ( state , :ping_timeout )
97+ defp ping ( % { ping_interval: ping_interval } = state ) do
98+ { seq , state } = next_seq ( state )
99+ do_send ( state , Frames . ping ( 0 , seq ) )
100+ Process . send_after ( self , :send_ping , ping_interval )
101+ % { state | pong_received: false }
102+ end
103+
104+ @ spec next_seq ( state ) :: { integer , state }
105+ defp next_seq ( % { next_seq: next_seq } = state ) do
106+ { next_seq , % { state | next_seq: next_seq + 1 } }
107+ end
108+
92109 @ spec handle_response ( state , integer , any , [ ] ) :: state
93110 defp handle_response ( % { monitor_refs: monitor_refs } = state , seq , response , responses ) do
94111 % { state | monitor_refs: Map . delete ( monitor_refs , seq ) }
@@ -110,12 +127,12 @@ defmodule Loqui.CowboyProtocol do
110127 defp response_frame ( { :compressed , payload } , seq ) , do: Frames . response ( @ flag_compressed , seq , payload )
111128 defp response_frame ( payload , seq ) , do: Frames . response ( @ empty_flags , seq , payload )
112129
113- @ spec socket_data ( state , binary ) :: { :ok , req , env }
114- defp socket_data ( state , data ) do
130+ @ spec handle_socket_data ( state , binary ) :: { :ok , req , env }
131+ defp handle_socket_data ( state , data ) do
115132 case Protocol . handle_data ( data ) do
116133 { :ok , request , extra } ->
117134 case handle_request ( request , state ) do
118- { :ok , state } -> socket_data ( state , extra )
135+ { :ok , state } -> handle_socket_data ( state , extra )
119136 { :shutdown , reason } -> close ( state , reason )
120137 end
121138 { :continue , extra } -> handler_loop ( state , extra )
@@ -142,10 +159,12 @@ defmodule Loqui.CowboyProtocol do
142159 end
143160 end
144161 defp handle_request ( { :ping , _flags , seq } , state ) do
145- state = refresh_ping_timeout ( state )
146162 do_send ( state , Frames . pong ( @ empty_flags , seq ) )
147163 { :ok , state }
148164 end
165+ defp handle_request ( { :pong , _flags , _seq } , state ) do
166+ { :ok , % { state | pong_received: true } }
167+ end
149168 defp handle_request ( { :request , _flags , seq , request } , state ) do
150169 { :ok , handler_request ( state , seq , request ) }
151170 end
@@ -187,7 +206,9 @@ defmodule Loqui.CowboyProtocol do
187206 def goaway ( state , :not_enough_options ) , do: goaway ( state , 8 , "NotEnoughOptions" )
188207
189208 @ spec goaway ( state , integer , atom ) :: { :ok , req , env }
190- def goaway ( state , code , reason ) do
209+ def goaway ( % { socket_pid: socket_pid , req: req } = state , code , reason ) do
210+ { host , _ } = :cowboy_req . host ( req )
211+ Logger . info "[loqui] goaway. host=#{ inspect host } socket_pid=#{ inspect socket_pid } code=#{ inspect code } reason=#{ inspect reason } "
191212 do_send ( state , Frames . goaway ( @ empty_flags , code , reason ) )
192213 close ( state , reason )
193214 end
@@ -255,13 +276,4 @@ defmodule Loqui.CowboyProtocol do
255276 defp choose_compression ( supported_compressions , [ compression | compressions ] ) do
256277 if Enum . member? ( supported_compressions , compression ) , do: compression , else: choose_compression ( supported_compressions , compressions )
257278 end
258-
259- @ spec refresh_ping_timeout ( state ) :: state
260- defp refresh_ping_timeout ( % { ping_interval: ping_interval , ping_timeout_ref: prev_ref } = state ) do
261- if prev_ref do
262- Process . cancel_timer ( prev_ref )
263- end
264- ref = Process . send_after ( self , :ping_timeout , ping_interval )
265- % { state | ping_timeout_ref: ref }
266- end
267279end
0 commit comments