Skip to content

Commit 5a46793

Browse files
committed
Support AMQP over WebSocket in Erlang client
## What? Implement the AMQP over WebSocket Binding Committee Specification 01 in the AMQP 1.0 Erlang client: https://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/cs01/amqp-wsb-v1.0-cs01.html ## Why? 1. This allows writing integration tests for the server implementation of AMQP over WebSocket. 2. Erlang and Elixir clients can use AMQP over WebSocket in environments where firewalls prohibit access to the AMQP port. ## How? Use gun as WebSocket client. The new module `amqp10_client_socket` handles socket operations (open, close, send) for: * TCP sockets * SSL sockets * WebSockets Prior to this commit, the amqp10_client_connection process closed only the write end of the socket after it sent the AMQP close performative. This commit removed premature socket closure because: 1. There is no equivalent feature provided in Gun since sending a WebSocket close frame causes Gun to cleanly close the connection for both writing and reading. 2. It's unnecessary and can result in unexpected and confusing behaviour on the server. 3. It's better practive to keep the TCP connection fully open until the AMQP closing handshake completes. 4. When amqp10_client_frame_reader terminates, it will cleanly close the socket for both writing and reading.
1 parent 75c4c39 commit 5a46793

File tree

5 files changed

+226
-148
lines changed

5 files changed

+226
-148
lines changed

deps/amqp10_client/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ endef
2929
PACKAGES_DIR ?= $(abspath PACKAGES)
3030

3131
BUILD_DEPS = rabbit_common elvis_mk
32-
DEPS = amqp10_common credentials_obfuscation
32+
DEPS = amqp10_common credentials_obfuscation gun
3333
TEST_DEPS = rabbit rabbitmq_ct_helpers
3434
LOCAL_DEPS = ssl inets crypto public_key
3535

deps/amqp10_client/src/amqp10_client_connection.erl

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@
4343

4444
-export([format_status/1]).
4545

46-
-type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}.
47-
4846
-type milliseconds() :: non_neg_integer().
4947

5048
-type address() :: inet:socket_address() | inet:hostname().
@@ -60,6 +58,8 @@
6058
address => address(),
6159
port => inet:port_number(),
6260
tls_opts => {secure_port, [ssl:tls_option()]},
61+
ws_path => string(),
62+
ws_opts => gun:opts(),
6363
notify => pid() | none, % the pid to send connection events to
6464
notify_when_opened => pid() | none,
6565
notify_when_closed => pid() | none,
@@ -83,14 +83,13 @@
8383
sessions_sup :: pid() | undefined,
8484
pending_session_reqs = [] :: [term()],
8585
reader :: pid() | undefined,
86-
socket :: amqp10_socket() | undefined,
86+
socket :: amqp10_client_socket:socket() | undefined,
8787
idle_time_out :: non_neg_integer() | undefined,
8888
heartbeat_timer :: timer:tref() | undefined,
8989
config :: connection_config()
9090
}).
9191

92-
-export_type([connection_config/0,
93-
amqp10_socket/0]).
92+
-export_type([connection_config/0]).
9493

9594
%% -------------------------------------------------------------------
9695
%% Public API.
@@ -152,7 +151,7 @@ start_link(Sup, Config) ->
152151
set_other_procs(Pid, OtherProcs) ->
153152
gen_statem:cast(Pid, {set_other_procs, OtherProcs}).
154153

155-
-spec socket_ready(pid(), amqp10_socket()) -> ok.
154+
-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok.
156155
socket_ready(Pid, Socket) ->
157156
gen_statem:cast(Pid, {socket_ready, Socket}).
158157

@@ -186,10 +185,10 @@ expecting_socket(_EvtType, {socket_ready, Socket},
186185
Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)),
187186
case Sasl of
188187
none ->
189-
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
188+
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
190189
{next_state, hdr_sent, State1};
191190
_ ->
192-
ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER),
191+
ok = amqp10_client_socket:send(Socket, ?SASL_PROTOCOL_HEADER),
193192
{next_state, sasl_hdr_sent, State1}
194193
end;
195194
expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) ->
@@ -205,9 +204,12 @@ expecting_socket({call, From}, begin_session,
205204
sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) ->
206205
{next_state, sasl_hdr_rcvds, State};
207206
sasl_hdr_sent({call, From}, begin_session,
208-
#state{pending_session_reqs = PendingSessionReqs} = State) ->
207+
#state{pending_session_reqs = PendingSessionReqs} = State) ->
209208
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
210-
{keep_state, State1}.
209+
{keep_state, State1};
210+
sasl_hdr_sent(info, {'DOWN', MRef, process, _Pid, _},
211+
#state{reader_m_ref = MRef}) ->
212+
{stop, {shutdown, reader_down}}.
211213

212214
sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
213215
sasl_server_mechanisms = {array, symbol, AvailableMechs}},
@@ -228,7 +230,7 @@ sasl_hdr_rcvds({call, From}, begin_session,
228230

229231
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}},
230232
#state{socket = Socket} = State) ->
231-
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
233+
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
232234
{next_state, hdr_sent, State};
233235
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}},
234236
#state{} = State) when C==1;C==2;C==3;C==4 ->
@@ -285,7 +287,7 @@ open_sent({call, From}, begin_session,
285287
#state{pending_session_reqs = PendingSessionReqs} = State) ->
286288
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
287289
{keep_state, State1};
288-
open_sent(info, {'DOWN', MRef, _, _, _},
290+
open_sent(info, {'DOWN', MRef, process, _, _},
289291
#state{reader_m_ref = MRef}) ->
290292
{stop, {shutdown, reader_down}}.
291293

@@ -294,46 +296,56 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
294296
{ok, Tmr} = start_heartbeat_timer(T),
295297
{keep_state, State#state{heartbeat_timer = Tmr}};
296298
opened(_EvtType, {close, Reason}, State) ->
297-
%% We send the first close frame and wait for the reply.
298299
%% TODO: stop all sessions writing
299300
%% We could still accept incoming frames (See: 2.4.6)
300301
case send_close(State, Reason) of
301-
ok -> {next_state, close_sent, State};
302-
{error, closed} -> {stop, normal, State};
303-
Error -> {stop, Error, State}
302+
ok ->
303+
%% "After writing this frame the peer SHOULD continue to read from the connection
304+
%% until it receives the partner's close frame (in order to guard against
305+
%% erroneously or maliciously implemented partners, a peer SHOULD implement a
306+
%% timeout to give its partner a reasonable time to receive and process the close
307+
%% before giving up and simply closing the underlying transport mechanism)." [§2.4.3]
308+
{next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}};
309+
{error, closed} ->
310+
{stop, normal, State};
311+
Error ->
312+
{stop, Error, State}
304313
end;
305314
opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
306315
%% We receive the first close frame, reply and terminate.
307316
ok = notify_closed(Config, Close),
308-
_ = send_close(State, none),
309-
{stop, normal, State};
317+
case send_close(State, none) of
318+
ok -> {stop, normal, State};
319+
{error, closed} -> {stop, normal, State};
320+
Error -> {stop, Error, State}
321+
end;
310322
opened({call, From}, begin_session, State) ->
311323
{Ret, State1} = handle_begin_session(From, State),
312324
{keep_state, State1, [{reply, From, Ret}]};
313-
opened(info, {'DOWN', MRef, _, _, _Info},
314-
State = #state{reader_m_ref = MRef, config = Config}) ->
325+
opened(info, {'DOWN', MRef, process, _, _Info},
326+
#state{reader_m_ref = MRef, config = Config}) ->
315327
%% reader has gone down and we are not already shutting down
316328
ok = notify_closed(Config, shutdown),
317-
{stop, normal, State};
329+
{stop, normal};
318330
opened(_EvtType, Frame, State) ->
319331
logger:warning("Unexpected connection frame ~tp when in state ~tp ",
320-
[Frame, State]),
321-
{keep_state, State}.
332+
[Frame, State]),
333+
keep_state_and_data.
322334

323-
close_sent(_EvtType, heartbeat, State) ->
324-
{next_state, close_sent, State};
325-
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) ->
335+
close_sent(_EvtType, heartbeat, _Data) ->
336+
keep_state_and_data;
337+
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) ->
326338
%% monitored processes may exit during closure
327-
{next_state, close_sent, State};
328-
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
329-
#state{reader = ReaderPid} = State) ->
330-
%% if the reader exits we probably wont receive a close frame
331-
{stop, normal, State};
332-
close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
339+
keep_state_and_data;
340+
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason},
341+
#state{reader = ReaderPid}) ->
342+
%% if the reader exits we probably won't receive a close frame
343+
{stop, normal};
344+
close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) ->
333345
ok = notify_closed(Config, Close),
334-
%% TODO: we should probably set up a timer before this to ensure
335-
%% we close down event if no reply is received
336-
{stop, normal, State}.
346+
{stop, normal};
347+
close_sent(state_timeout, received_no_close_frame, _Data) ->
348+
{stop, normal}.
337349

338350
set_other_procs0(OtherProcs, State) ->
339351
#{sessions_sup := SessionsSup,
@@ -435,22 +447,15 @@ send_open(#state{socket = Socket, config = Config0}) ->
435447
Encoded = amqp10_framing:encode_bin(Open),
436448
Frame = amqp10_binary_generator:build_frame(0, Encoded),
437449
?DBG("CONN <- ~tp", [Open]),
438-
socket_send(Socket, Frame).
450+
amqp10_client_socket:send(Socket, Frame).
439451

440452

441453
send_close(#state{socket = Socket}, _Reason) ->
442454
Close = #'v1_0.close'{},
443455
Encoded = amqp10_framing:encode_bin(Close),
444456
Frame = amqp10_binary_generator:build_frame(0, Encoded),
445457
?DBG("CONN <- ~tp", [Close]),
446-
Ret = socket_send(Socket, Frame),
447-
case Ret of
448-
ok -> _ =
449-
socket_shutdown(Socket, write),
450-
ok;
451-
_ -> ok
452-
end,
453-
Ret.
458+
amqp10_client_socket:send(Socket, Frame).
454459

455460
send_sasl_init(State, anon) ->
456461
Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>}},
@@ -474,21 +479,11 @@ send(Record, FrameType, #state{socket = Socket}) ->
474479
Encoded = amqp10_framing:encode_bin(Record),
475480
Frame = amqp10_binary_generator:build_frame(0, FrameType, Encoded),
476481
?DBG("CONN <- ~tp", [Record]),
477-
socket_send(Socket, Frame).
482+
amqp10_client_socket:send(Socket, Frame).
478483

479484
send_heartbeat(#state{socket = Socket}) ->
480485
Frame = amqp10_binary_generator:build_heartbeat_frame(),
481-
socket_send(Socket, Frame).
482-
483-
socket_send({tcp, Socket}, Data) ->
484-
gen_tcp:send(Socket, Data);
485-
socket_send({ssl, Socket}, Data) ->
486-
ssl:send(Socket, Data).
487-
488-
socket_shutdown({tcp, Socket}, How) ->
489-
gen_tcp:shutdown(Socket, How);
490-
socket_shutdown({ssl, Socket}, How) ->
491-
ssl:shutdown(Socket, How).
486+
amqp10_client_socket:send(Socket, Frame).
492487

493488
notify_opened(#{notify_when_opened := none}, _) ->
494489
ok;

0 commit comments

Comments
 (0)