Skip to content

Commit 83f55c7

Browse files
committed
HTTP/2 Web-MQTT
1 parent 4af7faf commit 83f55c7

File tree

4 files changed

+41
-4
lines changed

4 files changed

+41
-4
lines changed

deps/rabbit_common/src/rabbit_net.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ getstat(Sock, Stats) when is_port(Sock) ->
117117
getstat({rabbit_proxy_socket, Sock, _}, Stats) when ?IS_SSL(Sock) ->
118118
ssl:getstat(Sock, Stats);
119119
getstat({rabbit_proxy_socket, Sock, _}, Stats) when is_port(Sock) ->
120-
inet:getstat(Sock, Stats).
120+
inet:getstat(Sock, Stats);
121+
%% Virtual sockets; we do not have access to the underlying socket.
122+
getstat({rabbit_virtual_socket, _}, _) ->
123+
{error, enotsup}.
121124

122125
recv(Sock) when ?IS_SSL(Sock) ->
123126
recv(Sock, {ssl, ssl_closed, ssl_error});
@@ -180,7 +183,9 @@ peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock);
180183
peername(Sock) when is_port(Sock) -> inet:peername(Sock).
181184

182185
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock);
183-
peercert(Sock) when is_port(Sock) -> nossl.
186+
peercert(Sock) when is_port(Sock) -> nossl;
187+
peercert({rabbit_virtual_socket, #{cert := undefined}}) -> nossl;
188+
peercert({rabbit_virtual_socket, #{cert := Cert}}) -> Cert.
184189

185190
connection_string(Sock, Direction) ->
186191
case socket_ends(Sock, Direction) of
@@ -219,7 +224,15 @@ socket_ends({rabbit_proxy_socket, Sock, ProxyInfo}, Direction) ->
219224
} ->
220225
{ok, {rdns(FromAddress), FromPort,
221226
rdns(ToAddress), ToPort}}
222-
end.
227+
end;
228+
socket_ends({rabbit_virtual_socket, SocketInfo}, _) ->
229+
#{
230+
src_address := FromAddress,
231+
src_port := FromPort,
232+
dest_address := ToAddress,
233+
dest_port := ToPort
234+
} = SocketInfo,
235+
{ok, {rdns(FromAddress), FromPort, rdns(ToAddress), ToPort}}.
223236

224237
maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
225238
maybe_ntoab(Host) -> Host.

deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ handle({init, IntervalSecs, Sock}, _State) ->
4141
socket = Sock,
4242
recv_oct = RecvOct,
4343
received = true}};
44+
%% Virtual sockets (such as Websocket/WebTransport)
45+
%% do not support getstat.
46+
{error, enotsup} ->
47+
{ok, #state{interval_secs = IntervalSecs,
48+
timer = undefined,
49+
socket = Sock,
50+
recv_oct = 0,
51+
received = true}};
4452
{error, _} = Err ->
4553
Err
4654
end;

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ start_tls_listener(TLSConf0, CowboyOpts) ->
135135
num_acceptors => get_env(num_ssl_acceptors, 10),
136136
num_conns_sups => get_env(num_conns_sup, 1)
137137
},
138-
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of
138+
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts#{enable_connect_protocol => true}) of
139139
{ok, _} ->
140140
ok;
141141
{error, {already_started, _}} ->

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,25 @@
5656
upgrade(Req, Env, Handler, HandlerState) ->
5757
upgrade(Req, Env, Handler, HandlerState, #{}).
5858

59+
%% We simulate PROXY headers when HTTP/2 is used to have src/dest.
60+
upgrade(Req=#{version := 'HTTP/2', peer := Peer, sock := Sock, cert := Cert},
61+
Env, Handler, HandlerState, Opts) ->
62+
logger:error("~p ~p ~p ~p ~p", [Req, Env, Handler, HandlerState, Opts]),
63+
{SrcAddr, SrcPort} = Peer,
64+
{DestAddr, DestPort} = Sock,
65+
SocketInfo = #{
66+
src_address => SrcAddr,
67+
src_port => SrcPort,
68+
dest_address => DestAddr,
69+
dest_port => DestPort,
70+
cert => Cert
71+
},
72+
VirtualSocket = {rabbit_virtual_socket, SocketInfo},
73+
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState#state{socket = VirtualSocket}, Opts);
5974
upgrade(Req, Env, Handler, HandlerState, Opts) ->
6075
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
6176

77+
%% @todo This is only called for HTTP/1.1.
6278
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
6379
Sock = case HandlerState#state.socket of
6480
undefined ->

0 commit comments

Comments
 (0)