diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 3217409b3bfc..fec41ecae79b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -697,6 +697,12 @@ open(info, {OK, S, Data}, {next_state, close_sent, StatemData#statem_data{connection = Connection1, connection_state = State1}}; + failure -> + _ = demonitor_all_streams(Connection), + ?LOG_INFO("Force closing stream connection ~tp because of " + "transition to invalid state", + [self()]), + {stop, {shutdown, <<"Invalid state">>}}; _ -> State2 = case Blocked of @@ -1586,6 +1592,7 @@ handle_frame_post_auth(Transport, stream), auth_fail(NewUsername, Msg, Args, C1, S1), ?LOG_WARNING(Msg, Args), + silent_close_delay(), {C1#stream_connection{connection_step = failure}, {sasl_authenticate, ?RESPONSE_AUTHENTICATION_FAILURE, <<>>}}; @@ -1631,6 +1638,7 @@ handle_frame_post_auth(Transport, stream), ?LOG_WARNING("Not allowed to change username '~ts'. Only password", [Username]), + silent_close_delay(), {C1#stream_connection{connection_step = failure}, {sasl_authenticate, @@ -1652,6 +1660,7 @@ handle_frame_post_auth(Transport, {OtherMechanism, _} -> ?LOG_WARNING("User '~ts' cannot change initial auth mechanism '~ts' for '~ts'", [Username, NewMechanism, OtherMechanism]), + silent_close_delay(), CmdBody = {sasl_authenticate, ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM, <<>>}, Frame = rabbit_stream_core:frame({response, CorrelationId, CmdBody}), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl index 0c1bc2dcc683..0f5723221898 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl @@ -20,6 +20,10 @@ -type publisher_reference() :: binary(). -type subscription_id() :: byte(). -type internal_id() :: integer(). +-type connection_step() :: tcp_connected | peer_properties_exchanged | + authenticating | authenticated | tuning | + tuned | opened | failure | + closing | close_sent | closing_done. -record(publisher, {publisher_id :: publisher_id(), @@ -75,8 +79,7 @@ credits :: atomics:atomics_ref(), user :: undefined | #user{}, virtual_host :: undefined | binary(), - connection_step :: - atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done + connection_step :: connection_step(), frame_max :: integer(), heartbeat :: undefined | integer(), heartbeater :: any(), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 651fd7ec89dd..20c5f762212f 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -48,6 +48,7 @@ groups() -> test_update_secret, cannot_update_username_after_authenticated, cannot_use_another_authmechanism_when_updating_secret, + update_secret_should_close_connection_if_wrong_secret, unauthenticated_client_rejected_tcp_connected, timeout_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, @@ -292,22 +293,30 @@ test_update_secret(Config) -> cannot_update_username_after_authenticated(Config) -> {S, C0} = connect_and_authenticate(gen_tcp, Config), - C1 = expect_unsuccessful_authentication( - try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>), - ?RESPONSE_SASL_CANNOT_CHANGE_USERNAME), - _C2 = test_close(gen_tcp, S, C1), + _C1 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C0, <<"PLAIN">>, <<"other">>, <<"other">>), + ?RESPONSE_SASL_CANNOT_CHANGE_USERNAME), closed = wait_for_socket_close(gen_tcp, S, 10), ok. cannot_use_another_authmechanism_when_updating_secret(Config) -> {S, C0} = connect_and_authenticate(gen_tcp, Config), - C1 = expect_unsuccessful_authentication( - try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>), - ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM), - _C2 = test_close(gen_tcp, S, C1), + _C1 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C0, <<"EXTERNAL">>, <<"guest">>, <<"new_password">>), + ?RESPONSE_SASL_CANNOT_CHANGE_MECHANISM), closed = wait_for_socket_close(gen_tcp, S, 10), ok. +update_secret_should_close_connection_if_wrong_secret(Config) -> + Transport = gen_tcp, + {S, C0} = connect_and_authenticate(Transport, Config), + Pwd = rand:bytes(20), + _C1 = expect_unsuccessful_authentication( + try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, Pwd), + ?RESPONSE_AUTHENTICATION_FAILURE), + closed = wait_for_socket_close(Transport, S, 10), + ok. + test_stream_tls(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(ssl, Stream, Config),