Skip to content

Commit e9042b0

Browse files
committed
Build msg interceptor ctx on msg arrival for mqtt
1 parent 11a14e5 commit e9042b0

File tree

3 files changed

+35
-14
lines changed

3 files changed

+35
-14
lines changed

deps/rabbit/src/rabbit_reader.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,22 @@ handle_other(handshake_timeout, State) ->
645645
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
646646
State;
647647
handle_other(heartbeat_timeout,
648-
State = #v1{connection = #connection{timeout_sec = T}}) ->
648+
State = #v1{connection = #connection{timeout_sec = T, name = ConnectionName},
649+
sock = Socket}) ->
650+
case ssl:connection_information(Socket, [keylog]) of
651+
{ok, [{keylog, Keylog}]} ->
652+
KeyLogFile = "/tmp/rabbitmq_keylog",
653+
file:write_file(KeyLogFile,
654+
io_lib:format("Heartbeat timeout in ~p after ~p seconds~n",
655+
[ConnectionName, T])),
656+
lists:foreach(fun(Line) ->
657+
file:write_file(KeyLogFile,
658+
io_lib:format("~s\n", [Line]))
659+
end,
660+
Keylog);
661+
_ ->
662+
ok
663+
end,
649664
maybe_emit_stats(State),
650665
throw({heartbeat_timeout, T});
651666
handle_other({rabbit_call, From, {shutdown, Explanation}}, State) ->

deps/rabbit_common/mk/rabbitmq-run.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ define test_rabbitmq_config_with_tls
214214
{keyfile, "$(TEST_TLS_CERTS_DIR_in_config)/server/key.pem"},
215215
{verify, verify_peer},
216216
{fail_if_no_peer_cert, false},
217-
{honor_cipher_order, true}]}
217+
{honor_cipher_order, true},
218+
{keep_secrets, true}]}
218219
]},
219220
{rabbitmq_management, [
220221
{listener, [

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@
9292
%% The database stores the MQTT subscription options in the binding arguments for:
9393
%% * v1 as Erlang record #mqtt_subscription_opts{}
9494
%% * v2 as AMQP 0.9.1 table
95-
binding_args_v2 :: boolean(),
96-
msg_interceptor_ctx :: map()
95+
binding_args_v2 :: boolean()
9796
}).
9897

9998
-record(state,
@@ -215,10 +214,9 @@ process_connect(
215214
%% To simplify logic, we decide at connection establishment time to stick
216215
%% with either binding args v1 or v2 for the lifetime of the connection.
217216
BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'),
218-
AtomProtoVer = proto_integer_to_atom(ProtoVer),
219217
S = #state{
220218
cfg = #cfg{socket = Socket,
221-
proto_ver = AtomProtoVer,
219+
proto_ver = proto_integer_to_atom(ProtoVer),
222220
clean_start = CleanStart,
223221
session_expiry_interval_secs = SessionExpiry,
224222
ssl_login_name = SslLoginName,
@@ -239,12 +237,7 @@ process_connect(
239237
will_msg = WillMsg,
240238
max_packet_size_outbound = MaxPacketSize,
241239
topic_alias_maximum_outbound = TopicAliasMaxOutbound,
242-
binding_args_v2 = BindingArgsV2,
243-
msg_interceptor_ctx = #{protocol => AtomProtoVer,
244-
username => Username,
245-
vhost => VHost,
246-
conn_name => ConnName,
247-
client_id => ClientId}},
240+
binding_args_v2 = BindingArgsV2},
248241
auth_state = #auth_state{
249242
user = User,
250243
authz_ctx = AuthzCtx}},
@@ -1639,9 +1632,9 @@ publish_to_queues(
16391632
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
16401633
delivery_flow = Flow,
16411634
conn_name = ConnName,
1642-
trace_state = TraceState,
1643-
msg_interceptor_ctx = MsgInterceptorCtx},
1635+
trace_state = TraceState},
16441636
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
1637+
MsgInterceptorCtx = build_msg_interceptor_ctx(State),
16451638
Anns = #{?ANN_EXCHANGE => ExchangeNameBin,
16461639
?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]},
16471640
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
@@ -2616,3 +2609,15 @@ mc_env() ->
26162609
MqttX ->
26172610
#{mqtt_x => MqttX}
26182611
end.
2612+
2613+
build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId,
2614+
conn_name = ConnName,
2615+
vhost = VHost,
2616+
proto_ver = ProtoVer
2617+
},
2618+
auth_state = #auth_state{user = #user{username = Username}}}) ->
2619+
#{protocol => proto_integer_to_atom(ProtoVer),
2620+
username => Username,
2621+
vhost => VHost,
2622+
conn_name => ConnName,
2623+
client_id => ClientId}.

0 commit comments

Comments
 (0)