@@ -251,36 +251,36 @@ impl<T> PersistChannel<T> {
251251
252252 fn pop ( & self ) -> Result < PersistDataWrapper < ' _ , T > , PersistError > {
253253 let mut state = self . state . lock ( ) ;
254- loop {
254+ let ( permits_to_release , pending_reaps , data ) = loop {
255255 // Shutdown requested. Return error.
256256 if state. shutdown {
257257 return Err ( PersistError :: ChannelDisconnected ) ;
258258 }
259259 // Unblock to persist when permits available <= threshold
260260 if state. permits_available <= state. persist_threshold && state. data . is_some ( ) {
261- return Ok ( PersistDataWrapper {
262- channel : self ,
263- permits_to_release : state
261+ break (
262+ state
264263 . max_permits
265264 . get ( )
266265 . saturating_sub ( state. permits_available ) ,
267- pending_reaps : std:: mem:: take ( & mut state. pending_reaps ) ,
268- data : state. data . take ( ) ,
269- } ) ;
266+ std:: mem:: take ( & mut state. pending_reaps ) ,
267+ state. data . take ( ) ,
268+ ) ;
270269 }
271270 // Unblock even if we haven't met the threshold if there are pending reaps.
272271 // Permits to release is set to 0, and committed revision is not taken.
273272 if !state. pending_reaps . is_empty ( ) {
274- return Ok ( PersistDataWrapper {
275- channel : self ,
276- permits_to_release : 0 ,
277- pending_reaps : std:: mem:: take ( & mut state. pending_reaps ) ,
278- data : None ,
279- } ) ;
273+ break ( 0 , std:: mem:: take ( & mut state. pending_reaps ) , None ) ;
280274 }
281275 // Block until it is woken up by the committer thread.
282276 self . persist_ready . wait ( & mut state) ;
283- }
277+ } ;
278+ Ok ( PersistDataWrapper {
279+ channel : self ,
280+ permits_to_release,
281+ pending_reaps,
282+ data,
283+ } )
284284 }
285285
286286 fn close ( & self ) {
0 commit comments