@@ -196,12 +196,12 @@ create_stream(Q0) ->
196196 replica_nodes => Followers }),
197197 Q1 = amqqueue :set_type_state (Q0 , Conf ),
198198 case rabbit_amqqueue :internal_declare (Q1 , false ) of
199- {created , Q } ->
200- case rabbit_stream_coordinator :new_stream (Q , Leader ) of
199+ {created , Q2 } ->
200+ case rabbit_stream_coordinator :new_stream (Q2 , Leader ) of
201201 {ok , {ok , LeaderPid }, _ } ->
202202 % % update record with leader pid
203- case set_leader_pid (LeaderPid , amqqueue : get_name ( Q ) ) of
204- ok ->
203+ case set_leader_pid (LeaderPid , QName ) of
204+ { ok , Q } ->
205205 rabbit_event :notify (queue_created ,
206206 [{name , QName },
207207 {durable , true },
@@ -218,7 +218,7 @@ create_stream(Q0) ->
218218 [rabbit_misc :rs (QName ), node ()]}
219219 end ;
220220 Error ->
221- _ = rabbit_amqqueue :internal_delete (Q , ActingUser ),
221+ _ = rabbit_amqqueue :internal_delete (Q2 , ActingUser ),
222222 {protocol_error , internal_error , " Cannot declare ~ts on node '~ts ': ~255p " ,
223223 [rabbit_misc :rs (QName ), node (), Error ]}
224224 end ;
@@ -1396,11 +1396,6 @@ resend_all(#stream_client{leader = LeaderPid,
13961396 end || {Seq , {_Corr , Msg }} <- Msgs ],
13971397 State .
13981398
1399- - spec set_leader_pid (Pid , QName ) -> Ret when
1400- Pid :: pid (),
1401- QName :: rabbit_amqqueue :name (),
1402- Ret :: ok | {error , timeout }.
1403-
14041399set_leader_pid (Pid , QName ) ->
14051400 % % TODO this should probably be a single khepri transaction for better performance.
14061401 Fun = fun (Q ) ->
@@ -1409,10 +1404,12 @@ set_leader_pid(Pid, QName) ->
14091404 case rabbit_amqqueue :update (QName , Fun ) of
14101405 not_found ->
14111406 % % This can happen during recovery
1412- {ok , Q } = rabbit_amqqueue :lookup_durable_queue (QName ),
1413- rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Fun (Q ));
1414- _ ->
1415- ok
1407+ {ok , Q1 } = rabbit_amqqueue :lookup_durable_queue (QName ),
1408+ Q = Fun (Q1 ),
1409+ rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Q ),
1410+ {ok , Q };
1411+ Q ->
1412+ {ok , Q }
14161413 end .
14171414
14181415close_log (undefined ) -> ok ;
0 commit comments