Skip to content

Commit 0260862

Browse files
committed
Return error if stream consumer reference is longer than 255 characters
1 parent 4e8fb46 commit 0260862

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@
8080
peer_cert_validity]).
8181
-define(UNKNOWN_FIELD, unknown_field).
8282
-define(SILENT_CLOSE_DELAY, 3_000).
83+
-define(MAX_REFERENCE_SIZE, 255).
8384

84-
-import(rabbit_stream_utils, [check_write_permitted/2]).
85+
-import(rabbit_stream_utils, [check_write_permitted/2,
86+
check_read_permitted/3]).
8587

8688
%% client API
8789
-export([start_link/4,
@@ -1663,7 +1665,7 @@ handle_frame_post_auth(Transport,
16631665
State,
16641666
{request, CorrelationId,
16651667
{declare_publisher, _PublisherId, WriterRef, S}})
1666-
when is_binary(WriterRef), byte_size(WriterRef) > 255 ->
1668+
when is_binary(WriterRef), byte_size(WriterRef) > ?MAX_REFERENCE_SIZE ->
16671669
{Code, Counter} = case check_write_permitted(stream_r(S, C), User) of
16681670
ok ->
16691671
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
@@ -1917,6 +1919,19 @@ handle_frame_post_auth(Transport, #stream_connection{} = Connection, State,
19171919
{subscribe,
19181920
_, _, _, _, _}} = Request) ->
19191921
handle_frame_post_auth(Transport, {ok, Connection}, State, Request);
1922+
handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, State,
1923+
{request, CorrelationId,
1924+
{subscribe, _, S, _, _, #{ <<"name">> := N}}})
1925+
when is_binary(N), byte_size(N) > ?MAX_REFERENCE_SIZE ->
1926+
{Code, Counter} = case check_read_permitted(stream_r(S, C), User,#{}) of
1927+
ok ->
1928+
{?RESPONSE_CODE_PRECONDITION_FAILED, ?PRECONDITION_FAILED};
1929+
error ->
1930+
{?RESPONSE_CODE_ACCESS_REFUSED, ?ACCESS_REFUSED}
1931+
end,
1932+
response(Transport, C, subscribe, CorrelationId, Code),
1933+
rabbit_global_counters:increase_protocol_counter(stream, Counter, 1),
1934+
{C, State};
19201935
handle_frame_post_auth(Transport,
19211936
{ok, #stream_connection{
19221937
name = ConnName,

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ groups() ->
6565
authentication_error_should_close_with_delay,
6666
unauthorized_vhost_access_should_close_with_delay,
6767
sasl_anonymous,
68-
test_publisher_with_too_long_reference_errors
68+
test_publisher_with_too_long_reference_errors,
69+
test_consumer_with_too_long_reference_errors
6970
]},
7071
%% Run `test_global_counters` on its own so the global metrics are
7172
%% initialised to 0 for each testcase
@@ -978,6 +979,38 @@ test_publisher_with_too_long_reference_errors(Config) ->
978979
test_close(T, S, C),
979980
ok.
980981

982+
test_consumer_with_too_long_reference_errors(Config) ->
983+
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
984+
T = gen_tcp,
985+
Port = get_port(T, Config),
986+
Opts = get_opts(T),
987+
{ok, S} = T:connect("localhost", Port, Opts),
988+
C = rabbit_stream_core:init(0),
989+
ConnectionName = FunctionName,
990+
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
991+
test_authenticate(T, S, C),
992+
993+
Stream = FunctionName,
994+
test_create_stream(T, S, Stream, C),
995+
996+
MaxSize = 255,
997+
ReferenceOK = iolist_to_binary(lists:duplicate(MaxSize, <<"a">>)),
998+
ReferenceKO = iolist_to_binary(lists:duplicate(MaxSize + 1, <<"a">>)),
999+
1000+
Tests = [{1, ReferenceOK, ?RESPONSE_CODE_OK},
1001+
{2, ReferenceKO, ?RESPONSE_CODE_PRECONDITION_FAILED}],
1002+
1003+
[begin
1004+
F = request({subscribe, SubId, Stream, first, 1, #{<<"name">> => Ref}}),
1005+
ok = T:send(S, F),
1006+
{Cmd, C} = receive_commands(T, S, C),
1007+
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd)
1008+
end || {SubId, Ref, ExpectedResponseCode} <- Tests],
1009+
1010+
test_delete_stream(T, S, Stream, C),
1011+
test_close(T, S, C),
1012+
ok.
1013+
9811014
consumer_offset_info(Config, ConnectionName) ->
9821015
[[{offset, Offset},
9831016
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,

0 commit comments

Comments
 (0)