@@ -1927,21 +1927,17 @@ handle_frame_post_auth(Transport, {ok, #stream_connection{user = User} = C}, Sta
19271927 {C , State };
19281928handle_frame_post_auth (Transport ,
19291929 {ok , # stream_connection {
1930- name = ConnName ,
1931- socket = Socket ,
19321930 stream_subscriptions = StreamSubscriptions ,
19331931 virtual_host = VirtualHost ,
1934- user = User ,
1935- send_file_oct = SendFileOct ,
1936- transport = ConnTransport } = Connection },
1937- # stream_connection_state {consumers = Consumers } = State ,
1932+ user = User } = Connection },
1933+ State ,
19381934 {request , CorrelationId ,
19391935 {subscribe ,
19401936 SubscriptionId ,
19411937 Stream ,
19421938 OffsetSpec ,
1943- Credit ,
1944- Properties }}) ->
1939+ _Credit ,
1940+ Properties }} = Request ) ->
19451941 QueueResource =
19461942 # resource {name = Stream ,
19471943 kind = queue ,
@@ -2004,89 +2000,9 @@ handle_frame_post_auth(Transport,
20042000 increase_protocol_counter (? PRECONDITION_FAILED ),
20052001 {Connection , State };
20062002 _ ->
2007- Log = case Sac of
2008- true ->
2009- undefined ;
2010- false ->
2011- init_reader (ConnTransport ,
2012- LocalMemberPid ,
2013- QueueResource ,
2014- SubscriptionId ,
2015- Properties ,
2016- OffsetSpec )
2017- end ,
2018-
2019- ConsumerCounters =
2020- atomics :new (2 , [{signed , false }]),
2021-
2022- response_ok (Transport ,
2023- Connection ,
2024- subscribe ,
2025- CorrelationId ),
2026-
2027- Active =
2028- maybe_register_consumer (VirtualHost ,
2029- Stream ,
2030- ConsumerName ,
2031- ConnName ,
2032- SubscriptionId ,
2033- Properties ,
2034- Sac ),
2035-
2036- ConsumerConfiguration =
2037- # consumer_configuration {member_pid =
2038- LocalMemberPid ,
2039- subscription_id
2040- =
2041- SubscriptionId ,
2042- socket = Socket ,
2043- stream = Stream ,
2044- offset =
2045- OffsetSpec ,
2046- counters =
2047- ConsumerCounters ,
2048- properties =
2049- Properties ,
2050- active =
2051- Active },
2052- SendLimit = Credit div 2 ,
2053- ConsumerState =
2054- # consumer {configuration =
2055- ConsumerConfiguration ,
2056- log = Log ,
2057- send_limit = SendLimit ,
2058- credit = Credit },
2059-
2060- Connection1 =
2061- maybe_monitor_stream (LocalMemberPid ,
2062- Stream ,
2063- Connection ),
2064-
2065- State1 =
2066- maybe_dispatch_on_subscription (Transport ,
2067- State ,
2068- ConsumerState ,
2069- Connection1 ,
2070- Consumers ,
2071- Stream ,
2072- SubscriptionId ,
2073- Properties ,
2074- SendFileOct ,
2075- Sac ),
2076- StreamSubscriptions1 =
2077- case StreamSubscriptions of
2078- #{Stream := SubscriptionIds } ->
2079- StreamSubscriptions #{Stream =>
2080- [SubscriptionId ]
2081- ++ SubscriptionIds };
2082- _ ->
2083- StreamSubscriptions #{Stream =>
2084- [SubscriptionId ]}
2085- end ,
2086- {Connection1 # stream_connection {stream_subscriptions
2087- =
2088- StreamSubscriptions1 },
2089- State1 }
2003+ handle_subscription (Transport , Connection ,
2004+ State , Request ,
2005+ LocalMemberPid )
20902006 end
20912007 end
20922008 end ;
@@ -2995,8 +2911,106 @@ maybe_dispatch_on_subscription(_Transport,
29952911 Consumers1 = Consumers #{SubscriptionId => ConsumerState },
29962912 State # stream_connection_state {consumers = Consumers1 }.
29972913
2914+ handle_subscription (Transport ,# stream_connection {
2915+ name = ConnName ,
2916+ socket = Socket ,
2917+ stream_subscriptions = StreamSubscriptions ,
2918+ virtual_host = VirtualHost ,
2919+ send_file_oct = SendFileOct ,
2920+ transport = ConnTransport } = Connection ,
2921+ # stream_connection_state {consumers = Consumers } = State ,
2922+ {request , CorrelationId , {subscribe ,
2923+ SubscriptionId ,
2924+ Stream ,
2925+ OffsetSpec ,
2926+ Credit ,
2927+ Properties }},
2928+ LocalMemberPid ) ->
2929+ Sac = single_active_consumer (Properties ),
2930+ ConsumerName = consumer_name (Properties ),
2931+ QueueResource = # resource {name = Stream ,
2932+ kind = queue ,
2933+ virtual_host = VirtualHost },
2934+ case maybe_register_consumer (VirtualHost , Stream , ConsumerName , ConnName ,
2935+ SubscriptionId , Properties , Sac ) of
2936+ {ok , Active } ->
2937+ Log = case Sac of
2938+ true ->
2939+ undefined ;
2940+ false ->
2941+ init_reader (ConnTransport ,
2942+ LocalMemberPid ,
2943+ QueueResource ,
2944+ SubscriptionId ,
2945+ Properties ,
2946+ OffsetSpec )
2947+ end ,
2948+
2949+ ConsumerCounters = atomics :new (2 , [{signed , false }]),
2950+
2951+ response_ok (Transport ,
2952+ Connection ,
2953+ subscribe ,
2954+ CorrelationId ),
2955+
2956+ ConsumerConfiguration = # consumer_configuration {
2957+ member_pid = LocalMemberPid ,
2958+ subscription_id = SubscriptionId ,
2959+ socket = Socket ,
2960+ stream = Stream ,
2961+ offset = OffsetSpec ,
2962+ counters = ConsumerCounters ,
2963+ properties = Properties ,
2964+ active = Active },
2965+ SendLimit = Credit div 2 ,
2966+ ConsumerState =
2967+ # consumer {configuration = ConsumerConfiguration ,
2968+ log = Log ,
2969+ send_limit = SendLimit ,
2970+ credit = Credit },
2971+
2972+ Connection1 = maybe_monitor_stream (LocalMemberPid ,
2973+ Stream ,
2974+ Connection ),
2975+
2976+ State1 = maybe_dispatch_on_subscription (Transport ,
2977+ State ,
2978+ ConsumerState ,
2979+ Connection1 ,
2980+ Consumers ,
2981+ Stream ,
2982+ SubscriptionId ,
2983+ Properties ,
2984+ SendFileOct ,
2985+ Sac ),
2986+ StreamSubscriptions1 =
2987+ case StreamSubscriptions of
2988+ #{Stream := SubscriptionIds } ->
2989+ StreamSubscriptions #{Stream =>
2990+ [SubscriptionId ]
2991+ ++ SubscriptionIds };
2992+ _ ->
2993+ StreamSubscriptions #{Stream =>
2994+ [SubscriptionId ]}
2995+ end ,
2996+ {Connection1 # stream_connection {stream_subscriptions
2997+ =
2998+ StreamSubscriptions1 },
2999+ State1 };
3000+ {error , Reason } ->
3001+ rabbit_log :warning (" Cannot create SAC subcription ~tp : ~tp " ,
3002+ [SubscriptionId , Reason ]),
3003+ response (Transport ,
3004+ Connection ,
3005+ subscribe ,
3006+ CorrelationId ,
3007+ ? RESPONSE_CODE_PRECONDITION_FAILED ),
3008+ increase_protocol_counter (? PRECONDITION_FAILED ),
3009+ {Connection , State }
3010+ end .
3011+
29983012maybe_register_consumer (_ , _ , _ , _ , _ , _ , false = _Sac ) ->
2999- true ;
3013+ { ok , true } ;
30003014maybe_register_consumer (VirtualHost ,
30013015 Stream ,
30023016 ConsumerName ,
@@ -3005,15 +3019,13 @@ maybe_register_consumer(VirtualHost,
30053019 Properties ,
30063020 true ) ->
30073021 PartitionIndex = partition_index (VirtualHost , Stream , Properties ),
3008- {ok , Active } =
3009- rabbit_stream_sac_coordinator :register_consumer (VirtualHost ,
3010- Stream ,
3011- PartitionIndex ,
3012- ConsumerName ,
3013- self (),
3014- ConnectionName ,
3015- SubscriptionId ),
3016- Active .
3022+ rabbit_stream_sac_coordinator :register_consumer (VirtualHost ,
3023+ Stream ,
3024+ PartitionIndex ,
3025+ ConsumerName ,
3026+ self (),
3027+ ConnectionName ,
3028+ SubscriptionId ).
30173029
30183030maybe_send_consumer_update (Transport ,
30193031 Connection = # stream_connection {
0 commit comments