@@ -137,17 +137,28 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
137137 false -> Method # 'queue.declare' {auto_delete = true ,
138138 exclusive = true }
139139 end ,
140+
140141 % % set the rest of queue.declare fields from Params
141142 Method2 = lists :foldl (fun (F , Acc ) -> F (Acc , Params ) end ,
142143 Method1 , [fun update_queue_declare_arguments /2 ,
143144 fun update_queue_declare_exclusive /2 ,
144145 fun update_queue_declare_auto_delete /2 ,
145146 fun update_queue_declare_nowait /2 ]),
147+
148+ Arguments = proplists :get_value (arguments , Params , []),
149+ QueueType = rabbit_amqqueue :get_queue_type (Arguments ),
150+
151+ Method3 = case QueueType of
152+ rabbit_stream_queue -> Method2 # 'queue.declare' {durable = true ,
153+ exclusive = false };
154+ _ -> Method2
155+ end ,
156+
146157 case {Type , proplists :get_value (subscription_queue_name_gen , Params )} of
147158 {topic , SQNG } when is_function (SQNG ) ->
148- Method2 # 'queue.declare' {queue = SQNG ()};
159+ Method3 # 'queue.declare' {queue = SQNG ()};
149160 {exchange , SQNG } when is_function (SQNG ) ->
150- Method2 # 'queue.declare' {queue = SQNG ()};
161+ Method3 # 'queue.declare' {queue = SQNG ()};
151162 _ ->
152- Method2
163+ Method3
153164 end .
0 commit comments