@@ -236,7 +236,17 @@ delete_replica(StreamId, Node) ->
236236
237237policy_changed (Q ) when ? is_amqqueue (Q ) ->
238238 StreamId = maps :get (name , amqqueue :get_type_state (Q )),
239- process_command ({policy_changed , StreamId , #{queue => Q }}).
239+ case rabbit_feature_flags :is_enabled (stream_update_config_command ) of
240+ true ->
241+ % % there are the only two configuration keys that are safe to
242+ % % update
243+ Conf = maps :with ([filter_size ,
244+ retention ],
245+ rabbit_stream_queue :update_stream_conf (Q , #{})),
246+ process_command ({update_config , StreamId , Conf });
247+ false ->
248+ process_command ({policy_changed , StreamId , #{queue => Q }})
249+ end .
240250
241251sac_state (#? MODULE {single_active_consumer = SacState }) ->
242252 SacState .
@@ -1601,12 +1611,13 @@ update_stream0(#{system_time := _Ts} = _Meta,
16011611 M
16021612 end , Members0 ),
16031613 Stream0 # stream {members = Members };
1604- update_stream0 (#{system_time := _Ts },
1605- {policy_changed , _StreamId , #{queue := Q }},
1606- # stream {conf = Conf0 ,
1607- members = _Members0 } = Stream0 ) ->
1614+ update_stream0 (_Meta , {policy_changed , _StreamId , #{queue := Q }},
1615+ # stream {conf = Conf0 } = Stream0 ) ->
16081616 Conf = rabbit_stream_queue :update_stream_conf (Q , Conf0 ),
16091617 Stream0 # stream {conf = Conf };
1618+ update_stream0 (_Meta , {update_config , _StreamId , Conf },
1619+ # stream {conf = Conf0 } = Stream0 ) ->
1620+ Stream0 # stream {conf = maps :merge (Conf0 , Conf )};
16101621update_stream0 (_Meta , _Cmd , undefined ) ->
16111622 undefined .
16121623
0 commit comments