@@ -174,12 +174,12 @@ create_stream(Q0) ->
174174 replica_nodes => Followers }),
175175 Q1 = amqqueue :set_type_state (Q0 , Conf ),
176176 case rabbit_amqqueue :internal_declare (Q1 , false ) of
177- {created , Q } ->
178- case rabbit_stream_coordinator :new_stream (Q , Leader ) of
177+ {created , Q2 } ->
178+ case rabbit_stream_coordinator :new_stream (Q2 , Leader ) of
179179 {ok , {ok , LeaderPid }, _ } ->
180180 % % update record with leader pid
181- case set_leader_pid (LeaderPid , amqqueue : get_name ( Q ) ) of
182- ok ->
181+ case set_leader_pid (LeaderPid , QName ) of
182+ { ok , Q } ->
183183 rabbit_event :notify (queue_created ,
184184 [{name , QName },
185185 {durable , true },
@@ -196,7 +196,7 @@ create_stream(Q0) ->
196196 [rabbit_misc :rs (QName ), node ()]}
197197 end ;
198198 Error ->
199- _ = rabbit_amqqueue :internal_delete (Q , ActingUser ),
199+ _ = rabbit_amqqueue :internal_delete (Q2 , ActingUser ),
200200 {protocol_error , internal_error , " Cannot declare ~ts on node '~ts ': ~255p " ,
201201 [rabbit_misc :rs (QName ), node (), Error ]}
202202 end ;
@@ -1346,11 +1346,6 @@ resend_all(#stream_client{leader = LeaderPid,
13461346 end || {Seq , {_Corr , Msg }} <- Msgs ],
13471347 State .
13481348
1349- -spec set_leader_pid (Pid , QName ) -> Ret when
1350- Pid :: pid (),
1351- QName :: rabbit_amqqueue :name (),
1352- Ret :: ok | {error , timeout }.
1353-
13541349set_leader_pid (Pid , QName ) ->
13551350 % % TODO this should probably be a single khepri transaction for better performance.
13561351 Fun = fun (Q ) ->
@@ -1359,10 +1354,16 @@ set_leader_pid(Pid, QName) ->
13591354 case rabbit_amqqueue :update (QName , Fun ) of
13601355 not_found ->
13611356 % % This can happen during recovery
1362- {ok , Q } = rabbit_amqqueue :lookup_durable_queue (QName ),
1363- rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Fun (Q ));
1364- _ ->
1365- ok
1357+ {ok , Q1 } = rabbit_amqqueue :lookup_durable_queue (QName ),
1358+ Q = Fun (Q1 ),
1359+ case rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Q ) of
1360+ ok ->
1361+ {ok , Q };
1362+ Err ->
1363+ Err
1364+ end ;
1365+ Q ->
1366+ {ok , Q }
13661367 end .
13671368
13681369close_log (undefined ) -> ok ;
0 commit comments