Skip to content

Commit b6eccfc

Browse files
authored
Add example for use with GenServer (#20)
1 parent 6281c05 commit b6eccfc

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed

examples/genserver.exs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
Mix.install([:mint_web_socket, :castore])
2+
3+
# Also see https://github.com/phoenixframework/phoenix/blob/4da71906da970a162c88e165cdd2fdfaf9083ac3/test/support/websocket_client.exs
4+
5+
defmodule Ws do
6+
use GenServer
7+
8+
require Logger
9+
require Mint.HTTP
10+
11+
defstruct [:conn, :websocket, :request_ref, :caller, :status, :resp_headers, :closing?]
12+
13+
def connect(url) do
14+
with {:ok, socket} <- GenServer.start_link(__MODULE__, []),
15+
{:ok, :connected} <- GenServer.call(socket, {:connect, url}) do
16+
{:ok, socket}
17+
end
18+
end
19+
20+
def send_message(pid, text) do
21+
GenServer.call(pid, {:send_text, text})
22+
end
23+
24+
@impl GenServer
25+
def init([]) do
26+
{:ok, %__MODULE__{}}
27+
end
28+
29+
@impl GenServer
30+
def handle_call({:send_text, text}, _from, state) do
31+
{:ok, state} = send_frame(state, {:text, text})
32+
{:reply, :ok, state}
33+
end
34+
35+
@impl GenServer
36+
def handle_call({:connect, url}, from, state) do
37+
uri = URI.parse(url)
38+
39+
http_scheme =
40+
case uri.scheme do
41+
"ws" -> :http
42+
"wss" -> :https
43+
end
44+
45+
ws_scheme =
46+
case uri.scheme do
47+
"ws" -> :ws
48+
"wss" -> :wss
49+
end
50+
51+
path =
52+
case uri.query do
53+
nil -> uri.path
54+
query -> uri.path <> "?" <> query
55+
end
56+
57+
with {:ok, conn} <- Mint.HTTP.connect(http_scheme, uri.host, uri.port),
58+
{:ok, conn, ref} <- Mint.WebSocket.upgrade(ws_scheme, conn, path, []) do
59+
state = %{state | conn: conn, request_ref: ref, caller: from}
60+
{:noreply, state}
61+
else
62+
{:error, reason} ->
63+
{:reply, {:error, reason}, state}
64+
65+
{:error, conn, reason} ->
66+
{:reply, {:error, reason}, put_in(state.conn, conn)}
67+
end
68+
end
69+
70+
@impl GenServer
71+
def handle_info(message, state) do
72+
case Mint.WebSocket.stream(state.conn, message) do
73+
{:ok, conn, responses} ->
74+
state = put_in(state.conn, conn) |> handle_responses(responses)
75+
if state.closing?, do: do_close(state), else: {:noreply, state}
76+
77+
{:error, conn, reason, _responses} ->
78+
state = put_in(state.conn, conn) |> reply({:error, reason})
79+
{:noreply, state}
80+
81+
:unknown ->
82+
{:noreply, state}
83+
end
84+
end
85+
86+
defp handle_responses(state, responses)
87+
88+
defp handle_responses(%{request_ref: ref} = state, [{:status, ref, status} | rest]) do
89+
put_in(state.status, status)
90+
|> handle_responses(rest)
91+
end
92+
93+
defp handle_responses(%{request_ref: ref} = state, [{:headers, ref, resp_headers} | rest]) do
94+
put_in(state.resp_headers, resp_headers)
95+
|> handle_responses(rest)
96+
end
97+
98+
defp handle_responses(%{request_ref: ref} = state, [{:done, ref} | rest]) do
99+
case Mint.WebSocket.new(state.conn, ref, state.status, state.resp_headers) do
100+
{:ok, conn, websocket} ->
101+
%{state | conn: conn, websocket: websocket, status: nil, resp_headers: nil}
102+
|> reply({:ok, :connected})
103+
|> handle_responses(rest)
104+
105+
{:error, conn, reason} ->
106+
put_in(state.conn, conn)
107+
|> reply({:error, reason})
108+
end
109+
end
110+
111+
defp handle_responses(%{request_ref: ref, websocket: websocket} = state, [
112+
{:data, ref, data} | rest
113+
])
114+
when websocket != nil do
115+
case Mint.WebSocket.decode(websocket, data) do
116+
{:ok, websocket, frames} ->
117+
put_in(state.websocket, websocket)
118+
|> handle_frames(frames)
119+
|> handle_responses(rest)
120+
121+
{:error, websocket, reason} ->
122+
put_in(state.websocket, websocket)
123+
|> reply({:error, reason})
124+
end
125+
end
126+
127+
defp handle_responses(state, [_response | rest]) do
128+
handle_responses(state, rest)
129+
end
130+
131+
defp handle_responses(state, []), do: state
132+
133+
defp send_frame(state, frame) do
134+
with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame),
135+
state = put_in(state.websocket, websocket),
136+
{:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do
137+
{:ok, put_in(state.conn, conn)}
138+
else
139+
{:error, %Mint.WebSocket{} = websocket, reason} ->
140+
{:error, put_in(state.websocket, websocket), reason}
141+
142+
{:error, conn, reason} ->
143+
{:error, put_in(state.conn, conn), reason}
144+
end
145+
end
146+
147+
def handle_frames(state, frames) do
148+
Enum.reduce(frames, state, fn
149+
# reply to pings with pongs
150+
{:ping, data}, state ->
151+
{:ok, state} = send_frame(state, {:pong, data})
152+
state
153+
154+
{:close, _code, reason}, state ->
155+
Logger.debug("Closing connection: #{inspect(reason)}")
156+
%{state | closing?: true}
157+
158+
{:text, text}, state ->
159+
Logger.debug("Received: #{inspect(text)}, sending back the reverse")
160+
{:ok, state} = send_frame(state, {:text, String.reverse(text)})
161+
state
162+
163+
frame, state ->
164+
Logger.debug("Unexpected frame received: #{inspect(frame)}")
165+
state
166+
end)
167+
end
168+
169+
defp do_close(state) do
170+
# Streaming a close frame may fail if the server has already closed
171+
# for writing.
172+
_ = send_frame(state, :close)
173+
Mint.HTTP.close(state.conn)
174+
{:stop, :normal, state}
175+
end
176+
177+
defp reply(state, response) do
178+
if state.caller, do: GenServer.reply(state.caller, response)
179+
put_in(state.caller, nil)
180+
end
181+
end
182+
183+
{:ok, pid} = Ws.connect("ws://localhost:1234/")
184+
Ws.send_message(pid, "Hello from WS client")

0 commit comments

Comments
 (0)