@@ -46,10 +46,23 @@ defmodule SseUser do
46
46
end )
47
47
48
48
headers = build_headers ( context , topic )
49
- http_request_opts = [ ]
50
49
51
- { :ok , request_id } =
52
- :httpc . request ( :get , { url , headers } , http_request_opts , [ { :sync , false } , { :stream , :self } ] )
50
+ parsed_url = URI . parse ( url )
51
+
52
+ opts = % {
53
+ tls_opts: [
54
+ { :customize_hostname_check ,
55
+ [ { :match_fun , :public_key . pkix_verify_hostname_match_fun ( :https ) } ] }
56
+ ] ,
57
+ http_opts: % {
58
+ closing_timeout: :infinity
59
+ }
60
+ }
61
+
62
+ { :ok , conn_pid } = :gun . open ( String . to_atom ( parsed_url . host ) , parsed_url . port , opts )
63
+ { :ok , proto } = :gun . await_up ( conn_pid )
64
+ Logger . debug ( fn -> "Connection established with proto #{ inspect ( proto ) } " end )
65
+ stream_ref = :gun . get ( conn_pid , parsed_url . path , headers )
53
66
54
67
state = % SseState {
55
68
user_name: user_name ,
@@ -63,65 +76,54 @@ defmodule SseUser do
63
76
end
64
77
}
65
78
66
- wait_for_messages ( state , request_id , expected_messages )
79
+ wait_for_messages ( state , conn_pid , stream_ref , expected_messages )
67
80
end
68
81
69
- defp wait_for_messages ( state , request_id , [ first_message | remaining_messages ] ) do
82
+ defp wait_for_messages ( state , conn_pid , stream_ref , [ first_message | remaining_messages ] ) do
70
83
Logger . debug ( fn -> "#{ header ( state ) } Waiting for message: #{ first_message } " end )
71
84
72
- receive do
73
- { :http , { _ , { :error , msg } } } ->
74
- Logger . error ( "#{ header ( state ) } Http error: #{ inspect ( msg ) } " )
75
- :ok = :httpc . cancel_request ( request_id )
85
+ result = :gun . await ( conn_pid , stream_ref , state . sse_timeout )
86
+
87
+ case result do
88
+ { :response , _ , code , _ } when code == 200 ->
89
+ Logger . debug (
90
+ "#{ header ( state ) } Connected, waiting: #{ length ( remaining_messages ) + 1 } messages, url #{ state . url } "
91
+ )
92
+
93
+ state . start_publisher_callback . ( )
94
+ wait_for_messages ( state , conn_pid , stream_ref , [ first_message | remaining_messages ] )
95
+
96
+ { :response , _ , code , _ } ->
97
+ Logger . error ( "#{ header ( state ) } Error: #{ inspect ( code ) } " )
98
+ :ok = :gun . close ( conn_pid )
76
99
Stats . inc_msg_received_http_error ( )
77
- raise ( "#{ header ( state ) } Http error " )
100
+ raise ( "#{ header ( state ) } Error " )
78
101
79
- { :http , { _ , :stream , msg } } ->
102
+ { :data , _ , msg } ->
80
103
msg = String . trim ( msg )
81
104
Logger . debug ( fn -> "#{ header ( state ) } Received message: #{ inspect ( msg ) } " end )
82
105
83
106
if msg =~ "event: ping" do
84
- wait_for_messages ( state , request_id , [ first_message | remaining_messages ] )
107
+ wait_for_messages ( state , conn_pid , stream_ref , [ first_message | remaining_messages ] )
85
108
else
86
109
if check_message ( state , msg , first_message ) == :error do
87
- :ok = :httpc . cancel_request ( request_id )
110
+ :ok = :gun . close ( conn_pid )
88
111
raise ( "#{ header ( state ) } Message check error" )
89
112
end
90
113
91
114
state = Map . put ( state , :current_message , state . current_message + 1 )
92
- wait_for_messages ( state , request_id , remaining_messages )
115
+ wait_for_messages ( state , conn_pid , stream_ref , remaining_messages )
93
116
end
94
117
95
- { :http , { _ , :stream_start , headers } } ->
96
- { ~c" x-sse-server" , server } = List . keyfind ( headers , ~c" x-sse-server" , 0 )
97
-
98
- Logger . debug ( fn ->
99
- "#{ header ( state ) } Connected, waiting: #{ length ( remaining_messages ) + 1 } messages, url #{ state . url } , remote server: #{ server } "
100
- end )
101
-
102
- state . start_publisher_callback . ( )
103
-
104
- wait_for_messages ( state , request_id , [ first_message | remaining_messages ] )
105
-
106
118
msg ->
107
119
Logger . error ( "#{ header ( state ) } Unexpected message #{ inspect ( msg ) } " )
108
- :ok = :httpc . cancel_request ( request_id )
120
+ :ok = :gun . close ( conn_pid )
109
121
raise ( "#{ header ( state ) } Unexpected message" )
110
- after
111
- state . sse_timeout ->
112
- Logger . error (
113
- "#{ header ( state ) } Timeout waiting for message (timeout=#{ state . sse_timeout } ms), remaining: #{ length ( remaining_messages ) + 1 } messages, url #{ state . url } "
114
- )
115
-
116
- Stats . inc_msg_received_timeout ( )
117
-
118
- :ok = :httpc . cancel_request ( request_id )
119
- raise ( "#{ header ( state ) } Timeout waiting for message" )
120
122
end
121
123
end
122
124
123
- defp wait_for_messages ( state , request_id , [ ] ) do
124
- :ok = :httpc . cancel_request ( request_id )
125
+ defp wait_for_messages ( state , conn_pid , _ , [ ] ) do
126
+ :ok = :gun . close ( conn_pid )
125
127
Logger . info ( "#{ header ( state ) } All messages received, url #{ state . url } " )
126
128
end
127
129
0 commit comments