Skip to content

Commit e82570d

Browse files
committed
Fix partition index conflict in stream SAC coordinator
Consumers with a same name, consuming from the same stream should have the same partition index. This commit adds a check to enforce this rule and make the subscription fail if it does not comply. Fixes #13835
1 parent 99a9237 commit e82570d

File tree

3 files changed

+146
-111
lines changed

3 files changed

+146
-111
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -198,21 +198,23 @@ apply(#command_register_consumer{vhost = VirtualHost,
198198
owner = Owner,
199199
subscription_id = SubscriptionId},
200200
#?MODULE{groups = StreamGroups0} = State) ->
201-
StreamGroups1 =
202-
maybe_create_group(VirtualHost,
201+
case maybe_create_group(VirtualHost,
203202
Stream,
204203
PartitionIndex,
205204
ConsumerName,
206-
StreamGroups0),
207-
208-
do_register_consumer(VirtualHost,
209-
Stream,
210-
PartitionIndex,
211-
ConsumerName,
212-
ConnectionPid,
213-
Owner,
214-
SubscriptionId,
215-
State#?MODULE{groups = StreamGroups1});
205+
StreamGroups0) of
206+
{ok, StreamGroups1} ->
207+
do_register_consumer(VirtualHost,
208+
Stream,
209+
PartitionIndex,
210+
ConsumerName,
211+
ConnectionPid,
212+
Owner,
213+
SubscriptionId,
214+
State#?MODULE{groups = StreamGroups1});
215+
{error, Error} ->
216+
{State, {error, Error}, []}
217+
end;
216218
apply(#command_unregister_consumer{vhost = VirtualHost,
217219
stream = Stream,
218220
consumer_name = ConsumerName,
@@ -644,12 +646,15 @@ maybe_create_group(VirtualHost,
644646
ConsumerName,
645647
StreamGroups) ->
646648
case StreamGroups of
647-
#{{VirtualHost, Stream, ConsumerName} := _Group} ->
648-
StreamGroups;
649+
#{{VirtualHost, Stream, ConsumerName} := #group{partition_index = PI}}
650+
when PI =/= PartitionIndex ->
651+
{error, partition_index_conflict};
652+
#{{VirtualHost, Stream, ConsumerName} := _} ->
653+
{ok, StreamGroups};
649654
SGS ->
650-
maps:put({VirtualHost, Stream, ConsumerName},
651-
#group{consumers = [], partition_index = PartitionIndex},
652-
SGS)
655+
{ok, maps:put({VirtualHost, Stream, ConsumerName},
656+
#group{consumers = [], partition_index = PartitionIndex},
657+
SGS)}
653658
end.
654659

655660
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,20 @@ handle_connection_down_super_stream_no_active_removed_or_present_test(_) ->
503503
Groups),
504504
ok.
505505

506+
register_consumer_with_different_partition_index_should_return_error_test(_) ->
507+
Stream = <<"stream">>,
508+
ConsumerName = <<"app">>,
509+
ConnectionPid = self(),
510+
Command0 =
511+
register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 0),
512+
State0 = state(),
513+
{State1, {ok, true}, _} =
514+
rabbit_stream_sac_coordinator:apply(Command0, State0),
515+
Command1 =
516+
register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
517+
{_, {error, partition_index_conflict}, []} =
518+
rabbit_stream_sac_coordinator:apply(Command1, State1).
519+
506520
assertSize(Expected, []) ->
507521
?assertEqual(Expected, 0);
508522
assertSize(Expected, Map) when is_map(Map) ->

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 110 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1941,7 +1941,7 @@ handle_frame_post_auth(Transport,
19411941
Stream,
19421942
OffsetSpec,
19431943
Credit,
1944-
Properties}}) ->
1944+
Properties}} = Request) ->
19451945
QueueResource =
19461946
#resource{name = Stream,
19471947
kind = queue,
@@ -2004,89 +2004,9 @@ handle_frame_post_auth(Transport,
20042004
increase_protocol_counter(?PRECONDITION_FAILED),
20052005
{Connection, State};
20062006
_ ->
2007-
Log = case Sac of
2008-
true ->
2009-
undefined;
2010-
false ->
2011-
init_reader(ConnTransport,
2012-
LocalMemberPid,
2013-
QueueResource,
2014-
SubscriptionId,
2015-
Properties,
2016-
OffsetSpec)
2017-
end,
2018-
2019-
ConsumerCounters =
2020-
atomics:new(2, [{signed, false}]),
2021-
2022-
response_ok(Transport,
2023-
Connection,
2024-
subscribe,
2025-
CorrelationId),
2026-
2027-
Active =
2028-
maybe_register_consumer(VirtualHost,
2029-
Stream,
2030-
ConsumerName,
2031-
ConnName,
2032-
SubscriptionId,
2033-
Properties,
2034-
Sac),
2035-
2036-
ConsumerConfiguration =
2037-
#consumer_configuration{member_pid =
2038-
LocalMemberPid,
2039-
subscription_id
2040-
=
2041-
SubscriptionId,
2042-
socket = Socket,
2043-
stream = Stream,
2044-
offset =
2045-
OffsetSpec,
2046-
counters =
2047-
ConsumerCounters,
2048-
properties =
2049-
Properties,
2050-
active =
2051-
Active},
2052-
SendLimit = Credit div 2,
2053-
ConsumerState =
2054-
#consumer{configuration =
2055-
ConsumerConfiguration,
2056-
log = Log,
2057-
send_limit = SendLimit,
2058-
credit = Credit},
2059-
2060-
Connection1 =
2061-
maybe_monitor_stream(LocalMemberPid,
2062-
Stream,
2063-
Connection),
2064-
2065-
State1 =
2066-
maybe_dispatch_on_subscription(Transport,
2067-
State,
2068-
ConsumerState,
2069-
Connection1,
2070-
Consumers,
2071-
Stream,
2072-
SubscriptionId,
2073-
Properties,
2074-
SendFileOct,
2075-
Sac),
2076-
StreamSubscriptions1 =
2077-
case StreamSubscriptions of
2078-
#{Stream := SubscriptionIds} ->
2079-
StreamSubscriptions#{Stream =>
2080-
[SubscriptionId]
2081-
++ SubscriptionIds};
2082-
_ ->
2083-
StreamSubscriptions#{Stream =>
2084-
[SubscriptionId]}
2085-
end,
2086-
{Connection1#stream_connection{stream_subscriptions
2087-
=
2088-
StreamSubscriptions1},
2089-
State1}
2007+
handle_subscription(Transport, Connection,
2008+
State, Request,
2009+
LocalMemberPid)
20902010
end
20912011
end
20922012
end;
@@ -2995,8 +2915,106 @@ maybe_dispatch_on_subscription(_Transport,
29952915
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
29962916
State#stream_connection_state{consumers = Consumers1}.
29972917

2918+
handle_subscription(Transport,#stream_connection{
2919+
name = ConnName,
2920+
socket = Socket,
2921+
stream_subscriptions = StreamSubscriptions,
2922+
virtual_host = VirtualHost,
2923+
send_file_oct = SendFileOct,
2924+
transport = ConnTransport} = Connection,
2925+
#stream_connection_state{consumers = Consumers} = State,
2926+
{request, CorrelationId, {subscribe,
2927+
SubscriptionId,
2928+
Stream,
2929+
OffsetSpec,
2930+
Credit,
2931+
Properties}},
2932+
LocalMemberPid) ->
2933+
Sac = single_active_consumer(Properties),
2934+
ConsumerName = consumer_name(Properties),
2935+
QueueResource = #resource{name = Stream,
2936+
kind = queue,
2937+
virtual_host = VirtualHost},
2938+
case maybe_register_consumer(VirtualHost, Stream, ConsumerName, ConnName,
2939+
SubscriptionId, Properties, Sac) of
2940+
{ok, Active} ->
2941+
Log = case Sac of
2942+
true ->
2943+
undefined;
2944+
false ->
2945+
init_reader(ConnTransport,
2946+
LocalMemberPid,
2947+
QueueResource,
2948+
SubscriptionId,
2949+
Properties,
2950+
OffsetSpec)
2951+
end,
2952+
2953+
ConsumerCounters = atomics:new(2, [{signed, false}]),
2954+
2955+
response_ok(Transport,
2956+
Connection,
2957+
subscribe,
2958+
CorrelationId),
2959+
2960+
ConsumerConfiguration = #consumer_configuration{
2961+
member_pid = LocalMemberPid,
2962+
subscription_id = SubscriptionId,
2963+
socket = Socket,
2964+
stream = Stream,
2965+
offset = OffsetSpec,
2966+
counters = ConsumerCounters,
2967+
properties = Properties,
2968+
active = Active},
2969+
SendLimit = Credit div 2,
2970+
ConsumerState =
2971+
#consumer{configuration = ConsumerConfiguration,
2972+
log = Log,
2973+
send_limit = SendLimit,
2974+
credit = Credit},
2975+
2976+
Connection1 = maybe_monitor_stream(LocalMemberPid,
2977+
Stream,
2978+
Connection),
2979+
2980+
State1 = maybe_dispatch_on_subscription(Transport,
2981+
State,
2982+
ConsumerState,
2983+
Connection1,
2984+
Consumers,
2985+
Stream,
2986+
SubscriptionId,
2987+
Properties,
2988+
SendFileOct,
2989+
Sac),
2990+
StreamSubscriptions1 =
2991+
case StreamSubscriptions of
2992+
#{Stream := SubscriptionIds} ->
2993+
StreamSubscriptions#{Stream =>
2994+
[SubscriptionId]
2995+
++ SubscriptionIds};
2996+
_ ->
2997+
StreamSubscriptions#{Stream =>
2998+
[SubscriptionId]}
2999+
end,
3000+
{Connection1#stream_connection{stream_subscriptions
3001+
=
3002+
StreamSubscriptions1},
3003+
State1};
3004+
{error, Reason} ->
3005+
rabbit_log:warning("Cannot create SAC subcription ~tp: ~tp",
3006+
[SubscriptionId, Reason]),
3007+
response(Transport,
3008+
Connection,
3009+
subscribe,
3010+
CorrelationId,
3011+
?RESPONSE_CODE_PRECONDITION_FAILED),
3012+
increase_protocol_counter(?PRECONDITION_FAILED),
3013+
{Connection, State}
3014+
end.
3015+
29983016
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
2999-
true;
3017+
{ok, true};
30003018
maybe_register_consumer(VirtualHost,
30013019
Stream,
30023020
ConsumerName,
@@ -3005,15 +3023,13 @@ maybe_register_consumer(VirtualHost,
30053023
Properties,
30063024
true) ->
30073025
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
3008-
{ok, Active} =
3009-
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3010-
Stream,
3011-
PartitionIndex,
3012-
ConsumerName,
3013-
self(),
3014-
ConnectionName,
3015-
SubscriptionId),
3016-
Active.
3026+
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3027+
Stream,
3028+
PartitionIndex,
3029+
ConsumerName,
3030+
self(),
3031+
ConnectionName,
3032+
SubscriptionId).
30173033

30183034
maybe_send_consumer_update(Transport,
30193035
Connection = #stream_connection{

0 commit comments

Comments
 (0)