diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 8bc7bc2bcd85..054657cfccec 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -550,6 +550,9 @@ increase_messages_confirmed(Counters, Count) -> rabbit_global_counters:messages_confirmed(stream, Count), atomics:add(Counters, 2, Count). +increase_protocol_counter(Counter) -> + rabbit_global_counters:increase_protocol_counter(stream, Counter, 1). + messages_consumed(Counters) -> atomics:get(Counters, 1). @@ -833,9 +836,7 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason}, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {NewConnection, NewState}; {not_cleaned, SameConnection, SameState} -> {SameConnection, SameState} @@ -1559,8 +1560,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection0, State}; handle_frame_post_auth(Transport, @@ -1677,7 +1677,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, Code), - rabbit_global_counters:increase_protocol_counter(stream, Counter, 1), + increase_protocol_counter(Counter), {C, State}; handle_frame_post_auth(Transport, #stream_connection{user = User, @@ -1704,9 +1704,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {Connection0, State}; {error, not_available} -> response(Transport, @@ -1714,9 +1712,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {Connection0, State}; {ClusterLeader, #stream_connection{publishers = Publishers0, @@ -1768,9 +1764,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection0, State} end; error -> @@ -1779,9 +1773,7 @@ handle_frame_post_auth(Transport, declare_publisher, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection0, State} end; handle_frame_post_auth(Transport, @@ -1827,9 +1819,7 @@ handle_frame_post_auth(Transport, PublishingIds}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?PUBLISHER_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?PUBLISHER_DOES_NOT_EXIST), {Connection, State} end; handle_frame_post_auth(Transport, @@ -1850,9 +1840,7 @@ handle_frame_post_auth(Transport, ok -> case rabbit_stream_manager:lookup_leader(VirtualHost, Stream) of {error, not_found} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; {ok, LeaderPid} -> {?RESPONSE_CODE_OK, @@ -1864,9 +1852,7 @@ handle_frame_post_auth(Transport, end} end; error -> - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {?RESPONSE_CODE_ACCESS_REFUSED, 0} end, Frame = @@ -1909,9 +1895,7 @@ handle_frame_post_auth(Transport, delete_publisher, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?PUBLISHER_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?PUBLISHER_DOES_NOT_EXIST), {Connection0, State} end; handle_frame_post_auth(Transport, #stream_connection{} = Connection, State, @@ -1930,7 +1914,7 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta {?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED} end, response(Transport, C, subscribe, CorrelationId, Code), - rabbit_global_counters:increase_protocol_counter(stream, Counter, 1), + increase_protocol_counter(Counter), {C, State}; handle_frame_post_auth(Transport, {ok, #stream_connection{ @@ -1966,9 +1950,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {Connection, State}; {error, not_found} -> response(Transport, @@ -1976,9 +1958,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {Connection, State}; {ok, LocalMemberPid} -> case subscription_exists(StreamSubscriptions, @@ -1990,9 +1970,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS), - rabbit_global_counters:increase_protocol_counter(stream, - ?SUBSCRIPTION_ID_ALREADY_EXISTS, - 1), + increase_protocol_counter(?SUBSCRIPTION_ID_ALREADY_EXISTS), {Connection, State}; false -> rabbit_log:debug("Creating subscription ~tp to ~tp, with offset " @@ -2014,9 +1992,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State}; _ -> Log = case Sac of @@ -2111,9 +2087,7 @@ handle_frame_post_auth(Transport, subscribe, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2138,9 +2112,7 @@ handle_frame_post_auth(Transport, rabbit_stream_core:frame({response, 1, {credit, Code, SubscriptionId}}), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State#stream_connection_state{consumers = Consumers#{SubscriptionId => Consumer1}}}; @@ -2175,9 +2147,7 @@ handle_frame_post_auth(Transport, rabbit_stream_core:frame({response, 1, {credit, Code, SubscriptionId}}), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?SUBSCRIPTION_ID_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?SUBSCRIPTION_ID_DOES_NOT_EXIST), {Connection, State} end; handle_frame_post_auth(_Transport, @@ -2218,14 +2188,10 @@ handle_frame_post_auth(Transport, ok -> case lookup_leader(Stream, Connection0) of {error, not_found} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0}; {error, not_available} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0}; {LeaderPid, C} -> {RC, O} = @@ -2238,9 +2204,7 @@ handle_frame_post_auth(Transport, {RC, O, C} end; error -> - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {?RESPONSE_CODE_ACCESS_REFUSED, 0, Connection0} end, Frame = @@ -2262,9 +2226,7 @@ handle_frame_post_auth(Transport, unsubscribe, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?SUBSCRIPTION_ID_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?SUBSCRIPTION_ID_DOES_NOT_EXIST), {Connection, State}; true -> {Connection1, State1} = @@ -2304,9 +2266,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State}; {error, reference_already_exists} -> response(Transport, @@ -2314,9 +2274,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_ALREADY_EXISTS, - 1), + increase_protocol_counter(?STREAM_ALREADY_EXISTS), {Connection, State}; {error, _} -> response(Transport, @@ -2324,9 +2282,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR), - rabbit_global_counters:increase_protocol_counter(stream, - ?INTERNAL_ERROR, - 1), + increase_protocol_counter(?INTERNAL_ERROR), {Connection, State} end; error -> @@ -2335,9 +2291,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection, State} end; _ -> @@ -2346,9 +2300,7 @@ handle_frame_post_auth(Transport, create_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2377,9 +2329,7 @@ handle_frame_post_auth(Transport, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {NewConnection, NewState}; {not_cleaned, SameConnection, SameState} -> {SameConnection, SameState} @@ -2391,9 +2341,7 @@ handle_frame_post_auth(Transport, delete_stream, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {Connection, State} end; error -> @@ -2402,9 +2350,7 @@ handle_frame_post_auth(Transport, delete_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2525,9 +2471,7 @@ handle_frame_post_auth(Transport, {ok, Strs} -> {?RESPONSE_CODE_OK, Strs}; {error, _} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []} end, @@ -2548,9 +2492,7 @@ handle_frame_post_auth(Transport, {ok, Streams} -> {?RESPONSE_CODE_OK, Streams}; {error, _} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []} end, @@ -2745,15 +2687,11 @@ handle_frame_post_auth(Transport, ok -> case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of {error, not_available} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {stream_stats, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, #{}}; {error, not_found} -> - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {stream_stats, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, #{}}; {ok, MemberPid} -> @@ -2765,9 +2703,7 @@ handle_frame_post_auth(Transport, {stream_stats, ?RESPONSE_CODE_OK, StreamStats} end; error -> - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {stream_stats, ?RESPONSE_CODE_ACCESS_REFUSED, #{}} end, Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), @@ -2807,9 +2743,7 @@ handle_frame_post_auth(Transport, create_super_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State}; {error, {reference_already_exists, Msg}} -> rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", @@ -2819,9 +2753,7 @@ handle_frame_post_auth(Transport, create_super_stream, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_ALREADY_EXISTS, - 1), + increase_protocol_counter(?STREAM_ALREADY_EXISTS), {Connection, State}; {error, Error} -> rabbit_log:warning("Error while trying to create super stream ~tp: ~tp", @@ -2831,9 +2763,7 @@ handle_frame_post_auth(Transport, create_super_stream, CorrelationId, ?RESPONSE_CODE_INTERNAL_ERROR), - rabbit_global_counters:increase_protocol_counter(stream, - ?INTERNAL_ERROR, - 1), + increase_protocol_counter(?INTERNAL_ERROR), {Connection, State} end; error -> @@ -2842,9 +2772,7 @@ handle_frame_post_auth(Transport, create_super_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection, State} end; _ -> @@ -2853,9 +2781,7 @@ handle_frame_post_auth(Transport, create_super_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2892,9 +2818,7 @@ handle_frame_post_auth(Transport, delete_super_stream, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_DOES_NOT_EXIST, - 1), + increase_protocol_counter(?STREAM_DOES_NOT_EXIST), {Connection, State}; {error, Error} -> rabbit_log:warning("Error while trying to delete super stream ~tp: ~tp", @@ -2904,9 +2828,7 @@ handle_frame_post_auth(Transport, delete_super_stream, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), + increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State} end; @@ -2916,9 +2838,7 @@ handle_frame_post_auth(Transport, delete_super_stream, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), - rabbit_global_counters:increase_protocol_counter(stream, - ?ACCESS_REFUSED, - 1), + increase_protocol_counter(?ACCESS_REFUSED), {Connection, State} end; handle_frame_post_auth(Transport, @@ -2950,8 +2870,7 @@ handle_frame_post_auth(Transport, {close, ?RESPONSE_CODE_UNKNOWN_FRAME, CloseReason}}), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?UNKNOWN_FRAME, 1), + increase_protocol_counter(?UNKNOWN_FRAME), {Connection#stream_connection{connection_step = close_sent}, State}. process_client_command_versions(C, []) -> @@ -3172,9 +3091,7 @@ evaluate_state_after_secret_update(Transport, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, Socket, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {C1, S1} end, {Conn2, State1}, Streams) end, @@ -3341,9 +3258,7 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Frame = rabbit_stream_core:frame(Command), send(Transport, S, Frame), - rabbit_global_counters:increase_protocol_counter(stream, - ?STREAM_NOT_AVAILABLE, - 1), + increase_protocol_counter(?STREAM_NOT_AVAILABLE), {NewConnection, NewState}; {not_cleaned, SameConnection, SameState} -> {SameConnection, SameState}