@@ -118,37 +118,24 @@ enqueue(QName, Correlation, Msg,
118118 next_enqueue_seq = 1 ,
119119 cfg = # cfg {servers = Servers ,
120120 timeout = Timeout }} = State0 ) ->
121- % % it is the first enqueue, check the version
122- {_ , Node } = pick_server (State0 ),
123- case rpc :call (Node , ra_machine , version , [{machine , rabbit_fifo , #{}}]) of
124- 0 ->
125- % % the leader is running the old version
126- enqueue (QName , Correlation , Msg , State0 # state {queue_status = go });
127- N when is_integer (N ) ->
128- % % were running the new version on the leader do sync initialisation
129- % % of enqueuer session
130- Reg = rabbit_fifo :make_register_enqueuer (self ()),
131- case ra :process_command (Servers , Reg , Timeout ) of
132- {ok , reject_publish , Leader } ->
133- {reject_publish , State0 # state {leader = Leader ,
134- queue_status = reject_publish }};
135- {ok , ok , Leader } ->
136- enqueue (QName , Correlation , Msg , State0 # state {leader = Leader ,
137- queue_status = go });
138- {error , {no_more_servers_to_try , _Errs }} ->
139- % % if we are not able to process the register command
140- % % it is safe to reject the message as we never attempted
141- % % to send it
142- {reject_publish , State0 };
143- % % TODO: not convinced this can ever happen when using
144- % % a list of servers
145- {timeout , _ } ->
146- {reject_publish , State0 };
147- Err ->
148- exit (Err )
149- end ;
150- {badrpc , nodedown } ->
151- {reject_publish , State0 }
121+ % % the first publish, register and enqueuer for this process.
122+ Reg = rabbit_fifo :make_register_enqueuer (self ()),
123+ case ra :process_command (Servers , Reg , Timeout ) of
124+ {ok , reject_publish , Leader } ->
125+ {reject_publish , State0 # state {leader = Leader ,
126+ queue_status = reject_publish }};
127+ {ok , ok , Leader } ->
128+ enqueue (QName , Correlation , Msg , State0 # state {leader = Leader ,
129+ queue_status = go });
130+ {error , {no_more_servers_to_try , _Errs }} ->
131+ % % if we are not able to process the register command
132+ % % it is safe to reject the message as we never attempted
133+ % % to send it
134+ {reject_publish , State0 };
135+ {timeout , _ } ->
136+ {reject_publish , State0 };
137+ Err ->
138+ exit (Err )
152139 end ;
153140enqueue (_QName , _Correlation , _Msg ,
154141 # state {queue_status = reject_publish ,
0 commit comments