Skip to content

Commit b7fe364

Browse files
Loïc Hoguinmergify[bot]
authored andcommitted
Add support for LOCAL proxy header
This is what the proxy uses for health checks. In those cases we use the socket's IP/ports for the connection name as we have nothing else we can use. (cherry picked from commit 610af30) (cherry picked from commit 78a953a)
1 parent dcd6781 commit b7fe364

File tree

7 files changed

+153
-29
lines changed

7 files changed

+153
-29
lines changed

deps/rabbit/test/proxy_protocol_SUITE.erl

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ all() ->
2121

2222
groups() -> [
2323
{sequential_tests, [], [
24-
proxy_protocol,
25-
proxy_protocol_tls
24+
proxy_protocol_v1,
25+
proxy_protocol_v1_tls,
26+
proxy_protocol_v2_local
2627
]}
2728
].
2829

@@ -55,7 +56,7 @@ init_per_testcase(Testcase, Config) ->
5556
end_per_testcase(Testcase, Config) ->
5657
rabbit_ct_helpers:testcase_finished(Config, Testcase).
5758

58-
proxy_protocol(Config) ->
59+
proxy_protocol_v1(Config) ->
5960
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
6061
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
6162
[binary, {active, false}, {packet, raw}]),
@@ -68,7 +69,7 @@ proxy_protocol(Config) ->
6869
gen_tcp:close(Socket),
6970
ok.
7071

71-
proxy_protocol_tls(Config) ->
72+
proxy_protocol_v1_tls(Config) ->
7273
app_utils:start_applications([asn1, crypto, public_key, ssl]),
7374
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp_tls),
7475
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
@@ -83,6 +84,23 @@ proxy_protocol_tls(Config) ->
8384
gen_tcp:close(Socket),
8485
ok.
8586

87+
proxy_protocol_v2_local(Config) ->
88+
ProxyInfo = #{
89+
command => local,
90+
version => 2
91+
},
92+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
93+
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
94+
[binary, {active, false}, {packet, raw}]),
95+
ok = inet:send(Socket, ranch_proxy_header:header(ProxyInfo)),
96+
ok = inet:send(Socket, <<"AMQP", 0, 0, 9, 1>>),
97+
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
98+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
99+
?MODULE, connection_name, []),
100+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
101+
gen_tcp:close(Socket),
102+
ok.
103+
86104
connection_name() ->
87105
Pids = pg_local:get_members(rabbit_connections),
88106
Pid = lists:nth(1, Pids),

deps/rabbit_common/src/rabbit_net.erl

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -269,15 +269,21 @@ socket_ends(Sock, Direction) when ?IS_SSL(Sock);
269269
{_, {error, _Reason} = Error} ->
270270
Error
271271
end;
272-
socket_ends({rabbit_proxy_socket, _, ProxyInfo}, _) ->
273-
#{
274-
src_address := FromAddress,
275-
src_port := FromPort,
276-
dest_address := ToAddress,
277-
dest_port := ToPort
278-
} = ProxyInfo,
279-
{ok, {rdns(FromAddress), FromPort,
280-
rdns(ToAddress), ToPort}}.
272+
socket_ends({rabbit_proxy_socket, Sock, ProxyInfo}, Direction) ->
273+
case ProxyInfo of
274+
%% LOCAL header: we take the IP/ports from the socket.
275+
#{command := local} ->
276+
socket_ends(Sock, Direction);
277+
%% PROXY header: use the IP/ports from the proxy header.
278+
#{
279+
src_address := FromAddress,
280+
src_port := FromPort,
281+
dest_address := ToAddress,
282+
dest_port := ToPort
283+
} ->
284+
{ok, {rdns(FromAddress), FromPort,
285+
rdns(ToAddress), ToPort}}
286+
end.
281287

282288
maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
283289
maybe_ntoab(Host) -> Host.

deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ all() ->
2020

2121
groups() -> [
2222
{sequential_tests, [], [
23-
proxy_protocol,
24-
proxy_protocol_tls
23+
proxy_protocol_v1,
24+
proxy_protocol_v1_tls,
25+
proxy_protocol_v2_local
2526
]}
2627
].
2728

@@ -54,7 +55,7 @@ init_per_testcase(Testcase, Config) ->
5455
end_per_testcase(Testcase, Config) ->
5556
rabbit_ct_helpers:testcase_finished(Config, Testcase).
5657

57-
proxy_protocol(Config) ->
58+
proxy_protocol_v1(Config) ->
5859
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
5960
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
6061
[binary, {active, false}, {packet, raw}]),
@@ -68,7 +69,7 @@ proxy_protocol(Config) ->
6869
gen_tcp:close(Socket),
6970
ok.
7071

71-
proxy_protocol_tls(Config) ->
72+
proxy_protocol_v1_tls(Config) ->
7273
app_utils:start_applications([asn1, crypto, public_key, ssl]),
7374
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp_tls),
7475
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
@@ -85,6 +86,25 @@ proxy_protocol_tls(Config) ->
8586
gen_tcp:close(Socket),
8687
ok.
8788

89+
proxy_protocol_v2_local(Config) ->
90+
ProxyInfo = #{
91+
command => local,
92+
version => 2
93+
},
94+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
95+
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
96+
[binary, {active, false}, {packet, raw}]),
97+
ok = inet:send(Socket, ranch_proxy_header:header(ProxyInfo)),
98+
[ok = inet:send(Socket, amqp_1_0_frame(FrameType))
99+
|| FrameType <- [header_sasl, sasl_init, header_amqp, open, 'begin']],
100+
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
101+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
102+
?MODULE, connection_name, []),
103+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
104+
gen_tcp:close(Socket),
105+
ok.
106+
107+
88108
%% hex frames to send to have the connection recorded in RabbitMQ
89109
%% use wireshark with one of the Java tests to record those
90110
amqp_1_0_frame(header_sasl) ->

deps/rabbitmq_mqtt/test/proxy_protocol_SUITE.erl

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ all() ->
2020
groups() ->
2121
[
2222
{non_parallel_tests, [], [
23-
proxy_protocol,
24-
proxy_protocol_tls
23+
proxy_protocol_v1,
24+
proxy_protocol_v1_tls,
25+
proxy_protocol_v2_local
2526
]}
2627
].
2728

@@ -59,7 +60,7 @@ init_per_testcase(Testcase, Config) ->
5960
end_per_testcase(Testcase, Config) ->
6061
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6162

62-
proxy_protocol(Config) ->
63+
proxy_protocol_v1(Config) ->
6364
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
6465
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
6566
[binary, {active, false}, {packet, raw}]),
@@ -72,7 +73,7 @@ proxy_protocol(Config) ->
7273
gen_tcp:close(Socket),
7374
ok.
7475

75-
proxy_protocol_tls(Config) ->
76+
proxy_protocol_v1_tls(Config) ->
7677
app_utils:start_applications([asn1, crypto, public_key, ssl]),
7778
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
7879
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
@@ -87,6 +88,23 @@ proxy_protocol_tls(Config) ->
8788
gen_tcp:close(Socket),
8889
ok.
8990

91+
proxy_protocol_v2_local(Config) ->
92+
ProxyInfo = #{
93+
command => local,
94+
version => 2
95+
},
96+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
97+
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
98+
[binary, {active, false}, {packet, raw}]),
99+
ok = inet:send(Socket, ranch_proxy_header:header(ProxyInfo)),
100+
ok = inet:send(Socket, mqtt_3_1_1_connect_packet()),
101+
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
102+
timer:sleep(10),
103+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []),
104+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
105+
gen_tcp:close(Socket),
106+
ok.
107+
90108
connection_name() ->
91109
Connections = ets:tab2list(connection_created),
92110
{_Key, Values} = lists:nth(1, Connections),

deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ all() ->
2121
groups() ->
2222
[
2323
{non_parallel_tests, [], [
24-
proxy_protocol,
25-
proxy_protocol_tls
24+
proxy_protocol_v1,
25+
proxy_protocol_v1_tls,
26+
proxy_protocol_v2_local
2627
]}
2728
].
2829

@@ -59,7 +60,7 @@ init_per_testcase(Testcase, Config) ->
5960
end_per_testcase(Testcase, Config) ->
6061
rabbit_ct_helpers:testcase_finished(Config, Testcase).
6162

62-
proxy_protocol(Config) ->
63+
proxy_protocol_v1(Config) ->
6364
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
6465
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
6566
[binary, {active, false}, {packet, raw}]),
@@ -72,7 +73,7 @@ proxy_protocol(Config) ->
7273
gen_tcp:close(Socket),
7374
ok.
7475

75-
proxy_protocol_tls(Config) ->
76+
proxy_protocol_v1_tls(Config) ->
7677
app_utils:start_applications([asn1, crypto, public_key, ssl]),
7778
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
7879
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
@@ -87,6 +88,23 @@ proxy_protocol_tls(Config) ->
8788
gen_tcp:close(Socket),
8889
ok.
8990

91+
proxy_protocol_v2_local(Config) ->
92+
ProxyInfo = #{
93+
command => local,
94+
version => 2
95+
},
96+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
97+
{ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
98+
[binary, {active, false}, {packet, raw}]),
99+
ok = inet:send(Socket, ranch_proxy_header:header(ProxyInfo)),
100+
ok = inet:send(Socket, stomp_connect_frame()),
101+
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
102+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
103+
?MODULE, connection_name, []),
104+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
105+
gen_tcp:close(Socket),
106+
ok.
107+
90108
connection_name() ->
91109
Connections = ets:tab2list(connection_created),
92110
{_Key, Values} = lists:nth(1, Connections),

deps/rabbitmq_web_mqtt/test/proxy_protocol_SUITE.erl

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ all() ->
2626

2727
groups() ->
2828
Tests = [
29-
proxy_protocol
29+
proxy_protocol_v1,
30+
proxy_protocol_v2_local
3031
],
3132
[%{https_tests, [], Tests},
3233
{http_tests, [], Tests}].
@@ -78,7 +79,7 @@ init_per_testcase(Testcase, Config) ->
7879
end_per_testcase(Testcase, Config) ->
7980
rabbit_ct_helpers:testcase_finished(Config, Testcase).
8081

81-
proxy_protocol(Config) ->
82+
proxy_protocol_v1(Config) ->
8283
PortStr = rabbit_ws_test_util:get_web_mqtt_port_str(Config),
8384

8485
Protocol = ?config(protocol, Config),
@@ -98,6 +99,26 @@ proxy_protocol(Config) ->
9899
{close, _} = rfc6455_client:close(WS),
99100
ok.
100101

102+
proxy_protocol_v2_local(Config) ->
103+
ProxyInfo = #{
104+
command => local,
105+
version => 2
106+
},
107+
108+
PortStr = rabbit_ws_test_util:get_web_mqtt_port_str(Config),
109+
110+
Protocol = ?config(protocol, Config),
111+
WS = rfc6455_client:new(Protocol ++ "://127.0.0.1:" ++ PortStr ++ "/ws", self(),
112+
undefined, ["mqtt"], ranch_proxy_header:header(ProxyInfo)),
113+
{ok, _} = rfc6455_client:open(WS),
114+
rfc6455_client:send_binary(WS, rabbit_ws_test_util:mqtt_3_1_1_connect_packet()),
115+
{binary, _P} = rfc6455_client:recv(WS),
116+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
117+
?MODULE, connection_name, []),
118+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
119+
{close, _} = rfc6455_client:close(WS),
120+
ok.
121+
101122
connection_name() ->
102123
Connections = ets:tab2list(connection_created),
103124
{_Key, Values} = lists:nth(1, Connections),

deps/rabbitmq_web_stomp/test/proxy_protocol_SUITE.erl

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ all() ->
2626

2727
groups() ->
2828
Tests = [
29-
proxy_protocol
29+
proxy_protocol_v1,
30+
proxy_protocol_v2_local
3031
],
3132
[{https_tests, [], Tests},
3233
{http_tests, [], Tests}].
@@ -78,7 +79,7 @@ init_per_testcase(Testcase, Config) ->
7879
end_per_testcase(Testcase, Config) ->
7980
rabbit_ct_helpers:testcase_finished(Config, Testcase).
8081

81-
proxy_protocol(Config) ->
82+
proxy_protocol_v1(Config) ->
8283
Port = list_to_integer(rabbit_ws_test_util:get_web_stomp_port_str(Config)),
8384
PortStr = integer_to_list(Port),
8485

@@ -95,6 +96,28 @@ proxy_protocol(Config) ->
9596
{close, _} = rfc6455_client:close(WS),
9697
ok.
9798

99+
proxy_protocol_v2_local(Config) ->
100+
ProxyInfo = #{
101+
command => local,
102+
version => 2
103+
},
104+
105+
Port = list_to_integer(rabbit_ws_test_util:get_web_stomp_port_str(Config)),
106+
PortStr = integer_to_list(Port),
107+
108+
Protocol = ?config(protocol, Config),
109+
WS = rfc6455_client:new(Protocol ++ "://127.0.0.1:" ++ PortStr ++ "/ws", self(),
110+
undefined, [], ranch_proxy_header:header(ProxyInfo)),
111+
{ok, _} = rfc6455_client:open(WS),
112+
Frame = stomp:marshal("CONNECT", [{"login","guest"}, {"passcode", "guest"}], <<>>),
113+
rfc6455_client:send(WS, Frame),
114+
{ok, _P} = rfc6455_client:recv(WS),
115+
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
116+
?MODULE, connection_name, []),
117+
match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]),
118+
{close, _} = rfc6455_client:close(WS),
119+
ok.
120+
98121
connection_name() ->
99122
Connections = ets:tab2list(connection_created),
100123
{_Key, Values} = lists:nth(1, Connections),

0 commit comments

Comments
 (0)