11<?php
22namespace Packaged \Queue \Provider \Amqp ;
33
4+ use Exception ;
45use Packaged \Queue \IBatchQueueProvider ;
56use Packaged \Queue \Provider \AbstractQueueProvider ;
7+ use Packaged \Queue \Provider \QueueConnectionException ;
8+ use Packaged \Queue \QueueException ;
69use PhpAmqpLib \Channel \AMQPChannel ;
710use PhpAmqpLib \Connection \AbstractConnection ;
811use PhpAmqpLib \Connection \AMQPStreamConnection ;
@@ -91,6 +94,13 @@ public function setSlowPushThreshold($threshold)
9194 return $ this ;
9295 }
9396
97+ /**
98+ * @param array $batch
99+ * @param null $persistent
100+ *
101+ * @return $this
102+ * @throws QueueConnectionException
103+ */
94104 public function pushBatch (array $ batch , $ persistent = null )
95105 {
96106 $ mandatory = $ this ->_getMandatoryFlag ();
@@ -106,27 +116,17 @@ public function pushBatch(array $batch, $persistent = null)
106116 $ returnCallback = null ;
107117 if ($ mandatory )
108118 {
109- $ returnCallback = function (
110- $ replyCode ,
111- $ replyText ,
112- $ exchange ,
113- $ routingKey
114- ) use (
115- &$ needRetry , &$ needDeclare , &$ autoDeclare ,
116- $ declareAttempts , $ declareRetryLimit
117- )
119+ $ returnCallback = function ($ replyCode , $ replyText , $ exchange , $ routingKey ) use
120+ (&$ needRetry , &$ needDeclare , &$ autoDeclare , $ declareAttempts , $ declareRetryLimit )
118121 {
119- if ($ autoDeclare
120- && ($ declareAttempts < $ declareRetryLimit )
121- && ($ replyCode == 312 )
122- )
122+ if ($ autoDeclare && ($ declareAttempts < $ declareRetryLimit ) && ($ replyCode == 312 ))
123123 {
124124 $ needDeclare = true ;
125125 $ needRetry = true ;
126126 }
127127 else
128128 {
129- throw new \ Exception (
129+ throw new QueueConnectionException (
130130 'Error pushing message to exchange ' . $ exchange
131131 . ' with routing key ' . $ routingKey
132132 . ' : ( ' . $ replyCode . ') ' . $ replyText ,
@@ -178,7 +178,7 @@ public function pushBatch(array $batch, $persistent = null)
178178 {
179179 $ ch ->wait_for_pending_acks_returns ($ this ->_getPushTimeout ());
180180 }
181- catch (\ Exception $ e )
181+ catch (Exception $ e )
182182 {
183183 $ this ->disconnect (self ::CONN_PUSH );
184184 if ($ autoDeclare
@@ -469,6 +469,10 @@ protected function _refreshConnection($connectionMode)
469469 }
470470 }
471471
472+ /**
473+ * @return array
474+ * @throws QueueConnectionException
475+ */
472476 protected function _getHosts ()
473477 {
474478 if (!$ this ->_hosts )
@@ -487,7 +491,7 @@ protected function _getHosts()
487491 }
488492 else
489493 {
490- throw new \ Exception (
494+ throw new QueueConnectionException (
491495 'All hosts failed to connect ' . $ this ->_hostsRetriesMax .
492496 ' times within ' . $ this ->_hostsResetTimeMax . ' seconds '
493497 );
@@ -501,6 +505,7 @@ protected function _getHosts()
501505 * @param $connectionMode
502506 *
503507 * @return AMQPStreamConnection
508+ * @throws QueueConnectionException
504509 */
505510 protected function _getConnection ($ connectionMode )
506511 {
@@ -518,7 +523,7 @@ protected function _getConnection($connectionMode)
518523 $ config ->getItem ('password ' , 'guest ' )
519524 );
520525 }
521- catch (\ Exception $ e )
526+ catch (Exception $ e )
522527 {
523528 $ this ->_log ('AMQP host failed to connect ( ' . $ host . ') ' );
524529 array_shift ($ this ->_hosts );
@@ -536,7 +541,8 @@ protected function _getConnection($connectionMode)
536541 * @param $connectionMode
537542 *
538543 * @return AMQPChannel
539- * @throws \Exception
544+ * @throws QueueConnectionException
545+ * @throws Exception
540546 */
541547 protected function _getChannel ($ connectionMode )
542548 {
@@ -565,7 +571,7 @@ protected function _getChannel($connectionMode)
565571 break ;
566572 }
567573 }
568- catch (\ Exception $ e )
574+ catch (Exception $ e )
569575 {
570576 $ this ->_log (
571577 'Error getting AMQP channel ( ' . $ retries . ' retries remaining) '
@@ -620,7 +626,7 @@ private function _disconnect($connectionMode)
620626 $ this ->_channels [$ connectionMode ]->close ();
621627 }
622628 }
623- catch (\ Exception $ e )
629+ catch (Exception $ e )
624630 {
625631 }
626632 $ this ->_channels [$ connectionMode ] = null ;
@@ -633,17 +639,24 @@ private function _disconnect($connectionMode)
633639 $ this ->_connections [$ connectionMode ]->close ();
634640 }
635641 }
636- catch (\ Exception $ e )
642+ catch (Exception $ e )
637643 {
638644 }
639645 $ this ->_connections [$ connectionMode ] = null ;
640646 }
641647
648+ /**
649+ * @param callable $callback
650+ * @param $batchSize
651+ *
652+ * @return bool
653+ * @throws QueueException
654+ */
642655 public function batchConsume (callable $ callback , $ batchSize )
643656 {
644657 if ($ this ->_qosCount && $ batchSize > $ this ->_qosCount )
645658 {
646- throw new \ Exception ('Cannot consume batches greater than QoS ' );
659+ throw new QueueException ('Cannot consume batches greater than QoS ' );
647660 }
648661 return parent ::batchConsume ($ callback , $ batchSize );
649662 }
0 commit comments