@@ -141,7 +141,6 @@ public function consume(callable $callback)
141141 }
142142 catch (AMQPTimeoutException $ e )
143143 {
144- $ this ->_log ('No message received in ' . $ this ->_getWaitTime () . 's ' );
145144 return false ;
146145 }
147146 return true ;
@@ -324,50 +323,61 @@ protected function _getHosts()
324323 */
325324 protected function _getConnection ()
326325 {
327- if ($ this ->_connection === null )
326+ while ($ this ->_connection === null )
328327 {
329- while (!$ this ->_connection )
328+ $ this ->_getHosts ();
329+ $ host = reset ($ this ->_hosts );
330+ $ config = $ this ->config ();
331+ try
330332 {
331- $ this ->_getHosts ();
332- $ host = reset ($ this ->_hosts );
333- $ config = $ this ->config ();
334- try
335- {
336- $ this ->_connection = new AMQPStreamConnection (
337- $ host ,
338- $ config ->getItem ('port ' , 5672 ),
339- $ config ->getItem ('username ' , 'guest ' ),
340- $ config ->getItem ('password ' , 'guest ' )
341- );
342- }
343- catch (\Exception $ e )
344- {
345- $ this ->_log ('AMQP host failed to connect ( ' . $ host . ') ' );
346- array_shift ($ this ->_hosts );
347- }
348- $ this ->_persistentDefault = (bool )$ config ->getItem (
349- 'persistent ' ,
350- false
333+ $ this ->_connection = new AMQPStreamConnection (
334+ $ host ,
335+ $ config ->getItem ('port ' , 5672 ),
336+ $ config ->getItem ('username ' , 'guest ' ),
337+ $ config ->getItem ('password ' , 'guest ' )
351338 );
352- $ this ->_lastConnectTime = time ();
353339 }
340+ catch (\Exception $ e )
341+ {
342+ $ this ->_log ('AMQP host failed to connect ( ' . $ host . ') ' );
343+ array_shift ($ this ->_hosts );
344+ }
345+ $ this ->_persistentDefault = (bool )$ config ->getItem (
346+ 'persistent ' ,
347+ false
348+ );
349+ $ this ->_lastConnectTime = time ();
354350 }
355351 return $ this ->_connection ;
356352 }
357353
358354 /**
359355 * @return AMQPChannel
356+ * @throws \Exception
360357 */
361358 protected function _getChannel ()
362359 {
363- if ($ this ->_channel === null )
360+ $ retries = 2 ;
361+ while ($ this ->_channel === null )
364362 {
365- $ this ->_channel = $ this ->_getConnection ()->channel ();
366- $ config = $ this ->config ();
363+ $ connection = $ this ->_getConnection ();
364+ try
365+ {
366+ $ this ->_channel = $ connection ->channel ();
367+ $ config = $ this ->config ();
367368
368- $ qosSize = $ this ->_qosSize ?: $ config ->getItem ('qos_size ' , 0 );
369- $ qosCount = $ this ->_qosCount ?: $ config ->getItem ('qos_count ' , 0 );
370- $ this ->setPrefetch ($ qosCount , $ qosSize );
369+ $ qosSize = $ this ->_qosSize ?: $ config ->getItem ('qos_size ' , 0 );
370+ $ qosCount = $ this ->_qosCount ?: $ config ->getItem ('qos_count ' , 0 );
371+ $ this ->setPrefetch ($ qosCount , $ qosSize );
372+ }
373+ catch (\Exception $ e )
374+ {
375+ $ this ->disconnect ();
376+ if (!($ retries --))
377+ {
378+ throw $ e ;
379+ }
380+ }
371381 }
372382 return $ this ->_channel ;
373383 }
0 commit comments