diff --git a/doc/ChangeLog b/doc/ChangeLog index c777225115..226499997e 100644 --- a/doc/ChangeLog +++ b/doc/ChangeLog @@ -1,3 +1,21 @@ +2012-09-18 Evgeniy Tekalin + + * main/Utils/AMQP/AMQP.class.php, + main/Utils/AMQP/AMQPBaseChannel.class.php, + main/Utils/AMQP/AMQPBaseMessage.class.php, + main/Utils/AMQP/AMQPChannelInterface.class.php, + main/Utils/AMQP/AMQPPool.class.php, + main/Utils/AMQP/Pecl/AMQPPecl.class.php, + main/Utils/AMQP/Pecl/AMQPPeclBaseBitmask.class.php, + main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php, + main/Utils/AMQP/AMQPInterface.class.php, + main/Utils/AMQP/Pecl/AMQPPeclIncomingMessageAdapter.class.php, + main/Utils/AMQP/Pecl/AMQPPeclQueueConsumer.class.php, + main/Utils/AMQP/AMQPProxyChannel.class.php, + main/Utils/AMQP/AMQPSelective.class.php, + test/main/Utils/AMQP/AMQPPeclTest.class.php: + added suppot for pecl amqp 1.0.4 + 2012-09-06 Igor V. Gulyaev * main/Base/Hstore.class.php diff --git a/main/Utils/AMQP/AMQP.class.php b/main/Utils/AMQP/AMQP.class.php index d78ca9ef71..643f9a0ea7 100644 --- a/main/Utils/AMQP/AMQP.class.php +++ b/main/Utils/AMQP/AMQP.class.php @@ -13,13 +13,14 @@ * AMQP stands for Advanced Message Queue Protocol, which is * an open standard middleware layer for message routing and queuing. **/ - abstract class AMQP + abstract class AMQP implements AMQPInterface { /** * @var AMQPCredentials **/ protected $credentials = null; protected $link = null; + protected $alive = true; /** * @var array of AMQPChannelInterface instances @@ -49,7 +50,7 @@ abstract public function isConnected(); /** * @return AMQPChannelInterface */ - abstract protected function spawnChannel($id, AMQP $transport); + abstract public function spawnChannel($id, AMQPInterface $transport); public function __construct(AMQPCredentials $credentials) { @@ -143,5 +144,34 @@ public function dropChannel($id) return $this; } + + /** + * @return AMQPCredentials + */ + public function getCredentials() + { + return $this->credentials; + } + + /** + * @return bool + */ + public function isAlive() + { + return $this->alive; + } + + /** + * @param bool $alive + * @return AMQP + */ + public function setAlive($alive) + { + $this->alive = ($alive === true); + + return $this; + } + + } ?> \ No newline at end of file diff --git a/main/Utils/AMQP/AMQPBaseChannel.class.php b/main/Utils/AMQP/AMQPBaseChannel.class.php index b2019bd95d..40a297b117 100644 --- a/main/Utils/AMQP/AMQPBaseChannel.class.php +++ b/main/Utils/AMQP/AMQPBaseChannel.class.php @@ -17,11 +17,11 @@ abstract class AMQPBaseChannel implements AMQPChannelInterface protected $id = null; /** - * @var AMQP + * @var AMQPInterface **/ protected $transport = null; - public function __construct($id, AMQP $transport) + public function __construct($id, AMQPInterface $transport) { $this->id = $id; $this->transport = $transport; diff --git a/main/Utils/AMQP/AMQPBaseMessage.class.php b/main/Utils/AMQP/AMQPBaseMessage.class.php index 0e608f6b70..447953a6a9 100644 --- a/main/Utils/AMQP/AMQPBaseMessage.class.php +++ b/main/Utils/AMQP/AMQPBaseMessage.class.php @@ -14,8 +14,8 @@ **/ abstract class AMQPBaseMessage { - const CONTENT_TYPE = 'Content-type'; - const CONTENT_ENCODING = 'Content-encoding'; + const CONTENT_TYPE = 'content_type'; + const CONTENT_ENCODING = 'content_encoding'; const MESSAGE_ID = 'message_id'; const USER_ID = 'user_id'; const APP_ID = 'app_id'; @@ -158,7 +158,7 @@ public function getAppId() **/ public function setDeliveryMode($int) { - Assert::isInteger($int, __METHOD__); + Assert::isInteger($int, __METHOD__.": requires integer, given {$int}"); Assert::isTrue( in_array( @@ -168,7 +168,7 @@ public function setDeliveryMode($int) self::DELIVERY_MODE_PERISISTENT ) ), - __METHOD__ + __METHOD__.": unknown mode {$int}" ); $this->properties[self::DELIVERY_MODE] = $int; diff --git a/main/Utils/AMQP/AMQPChannelInterface.class.php b/main/Utils/AMQP/AMQPChannelInterface.class.php index d315387960..758a07ce81 100644 --- a/main/Utils/AMQP/AMQPChannelInterface.class.php +++ b/main/Utils/AMQP/AMQPChannelInterface.class.php @@ -37,29 +37,19 @@ public function exchangeDeclare( * @return AMQPChannelInterface **/ public function exchangeDelete( - $name, $ifUnused = false, $ifEmpty = false + $name, $ifUnused = false ); /** + * @see http://www.rabbitmq.com/blog/2010/10/19/exchange-to-exchange-bindings/ * @return AMQPChannelInterface **/ - public function exchangeBind($name, $queue, $routingKey); + public function exchangeBind($destinationName, $sourceName, $routingKey); /** * @return AMQPChannelInterface **/ - public function exchangeUnbind($name, $queue, $routingKey); - - /** - * @see http://www.rabbitmq.com/blog/2010/10/19/exchange-to-exchange-bindings/ - **/ - public function exchangeToExchangeBind( - $destination, $source, $routingKey - ); - - public function exchangeToExchangeUnbind( - $destination, $source, $routingKey - ); + public function exchangeUnbind($destinationName, $sourceName, $routingKey); /** * @return integer - the message count in queue @@ -101,7 +91,7 @@ public function basicQos($prefetchSize, $prefetchCount); /** * @return AMQPIncomingMessage **/ - public function basicGet($queue, $noAck = true); + public function basicGet($queue, $autoAck = true); /** * @return AMQPChannelInterface @@ -117,10 +107,5 @@ public function basicConsume($queue, $autoAck, AMQPConsumer $callback); * @return AMQPChannelInterface **/ public function basicCancel($consumerTag); - - /** - * @return AMQPIncomingMessage - **/ - public function getNextDelivery(); } ?> \ No newline at end of file diff --git a/main/Utils/AMQP/AMQPInterface.class.php b/main/Utils/AMQP/AMQPInterface.class.php new file mode 100644 index 0000000000..d247054e29 --- /dev/null +++ b/main/Utils/AMQP/AMQPInterface.class.php @@ -0,0 +1,89 @@ + \ No newline at end of file diff --git a/main/Utils/AMQP/AMQPPool.class.php b/main/Utils/AMQP/AMQPPool.class.php index 09c60d8973..088c5558b3 100644 --- a/main/Utils/AMQP/AMQPPool.class.php +++ b/main/Utils/AMQP/AMQPPool.class.php @@ -135,5 +135,19 @@ public function disconnect() return $this; } + + /** + * @return array + */ + public function getList() + { + $list = $this->pool; + + try { + $list['default'] = $this->getLink(); + } catch (MissingElementException $e) {/**/} + + return $list; + } } ?> \ No newline at end of file diff --git a/main/Utils/AMQP/AMQPProxyChannel.class.php b/main/Utils/AMQP/AMQPProxyChannel.class.php new file mode 100644 index 0000000000..96ff813281 --- /dev/null +++ b/main/Utils/AMQP/AMQPProxyChannel.class.php @@ -0,0 +1,341 @@ +channel = $channel; + } + + /** + * @return true + **/ + public function isOpen() + { + return $this->channel->isOpen(); + } + + /** + * @return AMQPChannelInterface + **/ + public function open() + { + return $this->channel->open(); + } + + /** + * @return AMQPChannelInterface + **/ + public function close() + { + return $this->channel->close(); + } + + /** + * @return AMQPChannelInterface + **/ + public function exchangeDeclare($name, AMQPExchangeConfig $conf) + { + try { + return $this->channel->exchangeDeclare($name, $conf); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + exchangeDeclare($name, $conf); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function exchangeDelete($name, $ifUnused = false) + { + try { + return $this->channel->exchangeDelete($name, $ifUnused); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + exchangeDelete($name, $ifUnused); + } + } + + public function exchangeBind($destinationName, $sourceName, $routingKey) + { + try { + return $this->channel->exchangeBind( + $destinationName, + $sourceName, + $routingKey + ); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + exchangeBind( + $destinationName, + $sourceName, + $routingKey + ); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function exchangeUnbind($destinationName, $sourceName, $routingKey) + { + try { + return $this->channel->exchangeUnbind( + $destinationName, + $sourceName, + $routingKey + ); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + exchangeUnbind( + $destinationName, + $sourceName, + $routingKey + ); + } + } + + /** + * @return int + **/ + public function queueDeclare($name, AMQPQueueConfig $conf) + { + try { + return $this->channel->queueDeclare($name, $conf); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + queueDeclare($name, $conf); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function queueBind($name, $exchange, $routingKey) + { + try { + return $this->channel->queueBind( + $name, + $exchange, + $routingKey + ); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + queueBind( + $name, + $exchange, + $routingKey + ); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function queueUnbind($name, $exchange, $routingKey) + { + try { + return $this->channel->queueUnbind( + $name, + $exchange, + $routingKey + ); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + queueUnbind( + $name, + $exchange, + $routingKey + ); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function queuePurge($name) + { + try { + return $this->channel->queuePurge($name); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + queuePurge($name); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function queueDelete($name) + { + try { + return $this->channel->queueDelete($name); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + queueDelete($name); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function basicPublish($exchange, $routingKey, AMQPOutgoingMessage $msg) + { + try { + return $this->channel->basicPublish( + $exchange, + $routingKey, + $msg + ); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicPublish($exchange, $routingKey, $msg); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function basicQos($prefetchSize, $prefetchCount) + { + try { + return $this->channel->basicQos($prefetchSize, $prefetchCount); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicQos($prefetchSize, $prefetchCount); + } + } + + /** + * @return AMQPIncomingMessage + **/ + public function basicGet($queue, $autoAck = true) + { + try { + return $this->channel->basicGet($queue, $autoAck); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicGet($queue, $autoAck); + } + } + + + /** + * @return AMQPChannelInterface + **/ + public function basicAck($deliveryTag, $multiple = false) + { + try { + return $this->channel->basicAck($deliveryTag, $multiple); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicAck($deliveryTag, $multiple); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function basicConsume($queue, $autoAck, AMQPConsumer $callback) + { + try { + return $this->channel->basicConsume($queue, $autoAck, $callback); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicConsume($queue, $autoAck, $callback); + } + } + + /** + * @return AMQPChannelInterface + **/ + public function basicCancel($consumerTag) + { + try { + return $this->channel->basicCancel($consumerTag); + } catch (AMQPServerException $e) { + return $this-> + transportReconnect($e)-> + basicCancel($consumerTag); + } + } + + /** + * @throws AMQPServerException + * @param Exception $e + * @return AMQPProxyChannel + */ + protected function transportReconnect(Exception $e) + { + $this->markAlive(false); + + $this->reconnect($e); + + return $this; + } + + private function markAlive($alive = false) + { + try { + $this->channel->getTransport()-> + setAlive($alive); + } catch (WrongArgumentException $e) {/*no_connection*/} + + return $this; + } + + /** + * @return AMQPProxyChannel + * @throws AMQPServerException + */ + private function reconnect(Exception $amqpException) + { + try { + $this->channel->getTransport()-> + setCurrent( + $this->channel->getTransport()->getAlive() + ); + } catch (WrongArgumentException $e) { + throw new AMQPServerException( + $amqpException->getMessage(), + $amqpException->getCode(), + $amqpException + ); + } + + return $this; + } + } +?> \ No newline at end of file diff --git a/main/Utils/AMQP/AMQPSelective.class.php b/main/Utils/AMQP/AMQPSelective.class.php new file mode 100644 index 0000000000..ee44c1aa58 --- /dev/null +++ b/main/Utils/AMQP/AMQPSelective.class.php @@ -0,0 +1,301 @@ +getList() as $name => $amqp) { + $this->addLink($name, $amqp); + + if ($name == 'default') + $this->setCurrent('default'); + } + + return $this; + } + + /** + * @throws WrongArgumentException + * @return AMQPPool + **/ + public function addLink($name, AMQP $amqp) + { + if (isset($this->pool[$name])) + throw new WrongArgumentException( + "amqp link with name '{$name}' already registered" + ); + + if ($this->pool) + Assert::isInstance($amqp, current($this->pool)); + + $this->pool[$name] = $amqp; + + return $this; + } + + /** + * @throws MissingElementException + * @return AMQPPool + **/ + public function dropLink($name) + { + if (!isset($this->pool[$name])) + throw new MissingElementException( + "amqp link with name '{$name}' not found" + ); + + unset($this->pool[$name]); + + $this->current = null; + + return $this; + } + + /** + * @param integer $id + * @throws WrongArgumentException + * @return AMQPChannelInterface + **/ + public function createChannel($id) + { + Assert::isInteger($id); + + if (isset($this->channels[$id])) + throw new WrongArgumentException( + "AMQP channel with id '{$id}' already registered" + ); + + if (!$this->current) + $this->setCurrent($this->getAlive()); + + if (!$this->isConnected()) + $this->connect(); + + $this->channels[$id] = new self::$proxy( + $this->getCurrentItem()->spawnChannel($id, $this) + ); + + $this->channels[$id]->open(); + + return $this->channels[$id]; + } + + /** + * @throws MissingElementException + * @return AMQPChannelInterface + **/ + public function getChannel($id) + { + if (isset($this->channels[$id])) + return $this->channels[$id]; + + throw new MissingElementException( + "Can't find AMQP channel with id '{$id}'" + ); + } + + /** + * @return array + **/ + public function getChannelList() + { + return $this->channels; + } + + /** + * @param integer $id + * @throws MissingElementException + * @return AMQPChannelInterface + **/ + public function dropChannel($id) + { + if (!isset($this->channels[$id])) + throw new MissingElementException( + "AMQP channel with id '{$id}' not found" + ); + + $this->channels[$id]->close(); + + unset($this->channels[$id]); + + return $this; + } + + /** + * @return AMQPInterface + * @throws AMQPServerConnectionException + */ + public function connect() + { + return $this->processMethod('connect'); + } + + /** + * @return AMQPInterface + **/ + public function disconnect() + { + return $this->processMethod('disconnect'); + } + + /** + * @return AMQPInterface + **/ + public function reconnect() + { + return $this->processMethod('reconnect'); + } + + /** + * @return boolean + **/ + public function isConnected() + { + return $this->processMethod('isConnected'); + } + + + /** + * @return AMQPInterface + **/ + public function getLink() + { + return $this->processMethod('getLink'); + } + + /** + * @return AMQPCredentials + */ + public function getCredentials() + { + return $this->processMethod('getCredentials'); + } + + + /** + * @return bool + */ + public function isAlive() + { + return $this->processMethod('isAlive'); + } + + /** + * @param bool $alive + * @return AMQPInterface + */ + public function setAlive($alive) + { + return $this->processMethod('setAlive', $alive); + } + + + /** + * @throws WrongArgumentException + * @param string $method + * @return mixed + */ + protected function processMethod($method/*, $args*/) + { + $args = func_get_args(); + array_shift($args); + + for ($i = 0; $i < count($this->pool); $i++) { + try { + $this->getCurrentItem()->connect(); + + return call_user_func_array( + array($this->getCurrentItem(), $method), + $args + ); + } catch (AMQPServerConnectionException $e) { + $this->setCurrent($this->getAlive()); + } + } + } + + /** + * @throws WrongArgumentException + * @return string + */ + public function getAlive() + { + foreach ($this->pool as $name => $item) { + if ($item->isAlive()) + return $name; + } + + Assert::isUnreachable("no alive connection"); + } + + /** + * @param string $name + * @return AMQPSelective + */ + public function setCurrent($name) + { + Assert::isIndexExists($this->pool, $name); + + $this->current = $name; + + return $this; + } + + /** + * @thows WrongArgumentException + * @return AMQPInterface + */ + protected function getCurrentItem() + { + if ($this->current && $this->pool[$this->current]->isAlive()) + return $this->pool[$this->current]; + + Assert::isUnreachable("no current connection"); + } + } +?> \ No newline at end of file diff --git a/main/Utils/AMQP/Pecl/AMQPPecl.class.php b/main/Utils/AMQP/Pecl/AMQPPecl.class.php index 911e1cde13..3ba4e64d2f 100644 --- a/main/Utils/AMQP/Pecl/AMQPPecl.class.php +++ b/main/Utils/AMQP/Pecl/AMQPPecl.class.php @@ -46,6 +46,8 @@ public function connect() $this->link->connect(); } catch (AMQPConnectionException $e) { + $this->alive = false; + throw new AMQPServerConnectionException( $e->getMessage(), $e->getCode(), @@ -66,6 +68,8 @@ public function reconnect() $this->link->reconnect(); return $this; } catch (AMQPConnectionException $e) { + $this->alive = false; + throw new AMQPServerConnectionException( $e->getMessage(), $e->getCode(), @@ -86,6 +90,8 @@ public function disconnect() return $this; } } catch (AMQPConnectionException $e) { + $this->alive = false; + throw new AMQPServerConnectionException( $e->getMessage(), $e->getCode(), @@ -96,10 +102,10 @@ public function disconnect() /** * @param mixed $id - * @param AMQP $transport + * @param AMQPInterface $transport * @return AMQPPeclChannel **/ - protected function spawnChannel($id, AMQP $transport) + public function spawnChannel($id, AMQPInterface $transport) { return new AMQPPeclChannel($id, $transport); } diff --git a/main/Utils/AMQP/Pecl/AMQPPeclBaseBitmask.class.php b/main/Utils/AMQP/Pecl/AMQPPeclBaseBitmask.class.php index 952d4362b8..2a0a44b499 100644 --- a/main/Utils/AMQP/Pecl/AMQPPeclBaseBitmask.class.php +++ b/main/Utils/AMQP/Pecl/AMQPPeclBaseBitmask.class.php @@ -27,8 +27,8 @@ public function getBitmask($config) if ($config->getNowait()) throw new UnimplementedFeatureException(); - if ($config->getArguments()) - throw new UnimplementedFeatureException(); +// if ($config->getArguments()) +// throw new UnimplementedFeatureException(); return $bitmask; } diff --git a/main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php b/main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php index 2dce1ceea8..b99cc5e188 100644 --- a/main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php +++ b/main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php @@ -12,17 +12,28 @@ final class AMQPPeclChannel extends AMQPBaseChannel { const NIL = 'nil'; - const AMQP_NONE = 0; + const AMQP_NONE = AMQP_NOPARAM; protected $exchangeList = array(); protected $queueList = array(); protected $opened = false; + + /** + * @var AMQPChannel + */ + protected $link = null; + /** * @var AMQPConsumer **/ protected $consumer = null; + public function __construct($id, AMQPInterface $transport) + { + parent::__construct($id, $transport); + } + public function isOpen() { return $this->opened === true; @@ -50,8 +61,10 @@ public function close() /** * @throws AMQPServerException|AMQPServerConnectionException - * @return AMQPChannelInterface - **/ + * @param sting $deliveryTag + * @param bool $multiple + * @return AMQPPeclChannel + */ public function basicAck($deliveryTag, $multiple = false) { try { @@ -62,7 +75,9 @@ public function basicAck($deliveryTag, $multiple = false) ? AMQP_MULTIPLE : self::AMQP_NONE ); - } catch (AMQPQueueException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -79,19 +94,24 @@ public function basicAck($deliveryTag, $multiple = false) } /** + * can't get $consumerTag * @throws AMQPServerQueueException|AMQPServerConnectionException|WrongStateException - * @return AMQPChannelInterface - **/ + * @param string $consumerTag + * @return AMQPPeclChannel + */ public function basicCancel($consumerTag) { if (!$this->consumer instanceof AMQPConsumer) throw new WrongStateException(); try { - $obj = $this->lookupQueue($this->consumer->getQueueName()); + $obj = $this->lookupQueue($consumerTag); + $result = $obj->cancel($consumerTag); - $this->consumer->handleCancelOk($consumerTag); - } catch (AMQPQueueException $e) { + + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -108,17 +128,42 @@ public function basicCancel($consumerTag) } /** - * PECL AMQP does not implement basicConsume logic, we'll emulate it. - * * @return AMQPChannelInterface **/ public function basicConsume($queue, $autoAck, AMQPConsumer $callback) { - $this->consumer = - $callback-> + Assert::isInstance($callback, 'AMQPPeclQueueConsumer'); + + try { + $this->consumer = $callback-> setQueueName($queue)-> setAutoAcknowledge($autoAck === true); + $obj = $this->lookupQueue($queue); + + $this->consumer->handleConsumeOk( + $this->consumer->getConsumerTag() + ); + + /** + * blocking function + */ + $obj->consume( + array($callback, 'handlePeclDelivery'), + $autoAck + ? AMQP_AUTOACK + : self::AMQP_NONE + ); + } catch (Exception $e) { + $this->clearConnection(); + + throw new AMQPServerException( + $e->getMessage(), + $e->getCode(), + $e + ); + } + return $this; } @@ -126,16 +171,18 @@ public function basicConsume($queue, $autoAck, AMQPConsumer $callback) * @throws AMQPServerException|AMQPServerConnectionException|ObjectNotFoundException * @return AMQPIncomingMessage **/ - public function basicGet($queue, $noAck = true) + public function basicGet($queue, $autoAck = true) { try { $obj = $this->lookupQueue($queue); $message = $obj->get( - ($noAck === true) - ? AMQP_NOACK + ($autoAck === true) + ? AMQP_AUTOACK : self::AMQP_NONE ); - } catch (AMQPQueueException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -143,38 +190,36 @@ public function basicGet($queue, $noAck = true) ); } - $this->checkCommandResult( - is_array($message), - "Could not get from queue" - ); - - if ( - isset($message[AMQPIncomingMessage::COUNT]) - && $message[AMQPIncomingMessage::COUNT] == -1 - ) + if (!$message) throw new ObjectNotFoundException( "AMQP queue with name '{$queue}' is empty" ); - return AMQPIncomingMessage::spawn($message); + return AMQPPeclIncomingMessageAdapter::convert($message); } /** * @throws AMQPServerExchangeException|AMQPServerConnectionException - * @return AMQPChannelInterface - **/ + * @param string $exchange + * @param string $routingKey + * @param AMQPOutgoingMessage $msg + * @return AMQPPeclChannel + */ public function basicPublish( $exchange, $routingKey, AMQPOutgoingMessage $msg ) { try { $obj = $this->lookupExchange($exchange); + $result = $obj->publish( $msg->getBody(), $routingKey, $msg->getBitmask(new AMQPPeclOutgoingMessageBitmask()), $msg->getProperties() ); - } catch (AMQPExchangeException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -190,39 +235,55 @@ public function basicPublish( return $this; } + /** + * @param int $prefetchSize + * @param int $prefetchCount + * @return AMQPPeclChannel + */ public function basicQos($prefetchSize, $prefetchCount) { - throw new UnimplementedFeatureException(); - } + try { + $result = $this->getChannelLink()->qos( + $prefetchSize, + $prefetchCount + ); + } catch (Exception $e) { + $this->clearConnection(); - public function exchangeToExchangeBind( - $destination, $source, $routingKey - ) - { - throw new UnimplementedFeatureException( - 'Exchange to exchange bindings is not yet implemented' - ); - } + throw new AMQPServerException( + $e->getMessage(), + $e->getCode(), + $e + ); + } - public function exchangeToExchangeUnbind( - $destination, $source, $routingKey - ) - { - throw new UnimplementedFeatureException( - 'Exchange to exchange unbindings is not yet implemented' + $this->checkCommandResult( + $result, + "Could not publish to exchange" ); + + return $this; } /** * @throws AMQPServerException|AMQPServerConnectionException - * @return AMQPChannelInterface - **/ - public function exchangeBind($name, $queue, $routingKey) + * @param string $destinationName + * @param string $sourceName + * @param string $routingKey + * @return AMQPPeclChannel + */ + public function exchangeBind($destinationName, $sourceName, $routingKey) { try { - $obj = $this->lookupExchange($name); - $result = $obj->bind($queue, $routingKey); - } catch (AMQPExchangeException $e) { + $obj = $this->lookupExchange($destinationName); + + $result = $obj->bind( + $sourceName, + $routingKey + ); + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -238,15 +299,17 @@ public function exchangeBind($name, $queue, $routingKey) return $this; } - public function exchangeUnbind($name, $queue, $routingKey) + public function exchangeUnbind($destinationName, $sourceName, $routingKey) { throw new UnimplementedFeatureException(); } /** * @throws AMQPServerException|AMQPServerConnectionException - * @return AMQPChannelInterface - **/ + * @param string $name + * @param AMQPExchangeConfig $conf + * @return AMQPPeclChannel + */ public function exchangeDeclare($name, AMQPExchangeConfig $conf) { $this->checkConnection(); @@ -258,16 +321,21 @@ public function exchangeDeclare($name, AMQPExchangeConfig $conf) try { $this->exchangeList[$name] = - new AMQPExchange($this->transport->getLink()); + new AMQPExchange($this->getChannelLink()); $obj = $this->exchangeList[$name]; - $result = $obj->declare( - $name, - $conf->getType()->getName(), + $obj->setName($name); + $obj->setType($conf->getType()->getName()); + $obj->setFlags( $conf->getBitmask(new AMQPPeclExchangeBitmask()) ); - } catch (AMQPExchangeException $e) { + $obj->setArguments($conf->getArguments()); + + $result = $obj->declare(); + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -288,20 +356,19 @@ public function exchangeDeclare($name, AMQPExchangeConfig $conf) * @return AMQPChannelInterface **/ public function exchangeDelete( - $name, $ifUnused = false, $ifEmpty = false + $name, $ifUnused = false ) { $bitmask = self::AMQP_NONE; if ($ifUnused) $bitmask = $bitmask | AMQP_IFUNUSED; - if ($ifEmpty) - $bitmask = $bitmask | AMQP_IFEMPTY; - try { $obj = $this->lookupExchange($name); $result = $obj->delete($name, $bitmask); - } catch (AMQPExchangeException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -328,7 +395,9 @@ public function queueBind($name, $exchange, $routingKey) try { $obj = $this->lookupQueue($name); $result = $obj->bind($exchange, $routingKey); - } catch (AMQPQueueException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -353,16 +422,23 @@ public function queueDeclare($name, AMQPQueueConfig $conf) $this->checkConnection(); try { + if (isset($this->queueList[$name])) + unset($this->queueList[$name]); + $this->queueList[$name] = - new AMQPQueue($this->transport->getLink()); + new AMQPQueue($this->getChannelLink()); $obj = $this->queueList[$name]; - - $result = $obj->declare( - $name, + $obj->setName($name); + $obj->setFlags( $conf->getBitmask(new AMQPPeclQueueBitmask()) ); - } catch (AMQPQueueException $e) { + $obj->setArguments($conf->getArguments()); + + $result = $obj->declare(); + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -386,8 +462,10 @@ public function queueDelete($name) { try { $obj = $this->lookupQueue($name); - $result = $obj->delete($name); - } catch (AMQPQueueException $e) { + $result = $obj->delete(); + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -413,8 +491,10 @@ public function queuePurge($name) { try { $obj = $this->lookupQueue($name); - $result = $obj->purge($name); - } catch (AMQPQueueException $e) { + $result = $obj->purge(); + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -439,7 +519,9 @@ public function queueUnbind($name, $exchange, $routingKey) try { $obj = $this->lookupQueue($name); $result = $obj->unbind($exchange, $routingKey); - } catch (AMQPQueueException $e) { + } catch (Exception $e) { + $this->clearConnection(); + throw new AMQPServerException( $e->getMessage(), $e->getCode(), @@ -455,62 +537,6 @@ public function queueUnbind($name, $exchange, $routingKey) return $this; } - /** - * @throws AMQPServerException|WrongStateException - * @return AMQPIncomingMessage - **/ - public function getNextDelivery() - { - if (!$this->consumer instanceof AMQPConsumer) - throw new WrongStateException(); - - try { - $obj = $this->lookupQueue( - $this->consumer->getQueueName() - ); - - $messages = $obj->consume( - array( - 'min' => 1, - 'max' => 1, - 'ack' => (bool) $this->consumer->isAutoAcknowledge(), - ) - ); - } catch (AMQPQueueException $e) { - throw new AMQPServerException( - $e->getMessage(), - $e->getCode(), - $e - ); - } - - $this->checkCommandResult( - is_array($messages) && !empty($messages), - "Could not consume from queue" - ); - - $message = array_shift($messages); - - $incoming = AMQPIncomingMessage::spawn($message); - - if ($this->consumer->getConsumerTag() === null) { - $this->consumer->setConsumerTag($incoming->getConsumerTag()); - $this->consumer->handleConsumeOk($incoming->getConsumerTag()); - } else if ( - $this->consumer->getConsumerTag() - != $incoming->getConsumerTag() - ) { - $this->consumer->handleChangeConsumerTag( - $this->consumer->getConsumerTag(), - $incoming->getConsumerTag() - ); - } - - $this->consumer->handleDelivery($incoming); - - return $incoming; - } - /** * @throws AMQPServerConnectionException * @return AMQPExchange @@ -521,7 +547,8 @@ protected function lookupExchange($name) if (!isset($this->exchangeList[$name])) { $this->exchangeList[$name] = - new AMQPExchange($this->transport->getLink(), $name); + new AMQPExchange($this->getChannelLink()); + $this->exchangeList[$name]->setName($name); } return $this->exchangeList[$name]; @@ -547,10 +574,9 @@ protected function lookupQueue($name) $this->checkConnection(); if (!isset($this->queueList[$name])) { - $this->queueList[$name] = - ($name == self::NIL) - ? new AMQPQueue($this->transport->getLink()) - : new AMQPQueue($this->transport->getLink(), $name); + $this->queueList[$name] = new AMQPQueue($this->getChannelLink()); + if ($name != self::NIL) + $this->queueList[$name]->setName($name); } return $this->queueList[$name]; @@ -581,5 +607,36 @@ protected function checkCommandResult($boolean, $message) return $this; } + + protected function clearConnection() + { + unset($this->link); + $this->link = null; + + $this->exchangeList = array(); + $this->queueList = array(); + + return $this; + } + + protected function getChannelLink() + { + if (!$this->link) { + $this->link = new AMQPChannel( + $this->getTransport()->getLink() + ); + } + + return $this->link; + } + + /** + * we dont know if connection is boken until request is made + * @return AMQPPeclChannel + */ + protected function checkConnection() + { + return $this; + } } ?> \ No newline at end of file diff --git a/main/Utils/AMQP/Pecl/AMQPPeclIncomingMessageAdapter.class.php b/main/Utils/AMQP/Pecl/AMQPPeclIncomingMessageAdapter.class.php new file mode 100644 index 0000000000..7ad1991941 --- /dev/null +++ b/main/Utils/AMQP/Pecl/AMQPPeclIncomingMessageAdapter.class.php @@ -0,0 +1,49 @@ + $incoming->getAppId(), + AMQPIncomingMessage::BODY => $incoming->getBody(), + AMQPIncomingMessage::CONTENT_ENCODING => $incoming->getContentEncoding(), + AMQPIncomingMessage::CONTENT_TYPE => $incoming->getContentType(), + AMQPIncomingMessage::CORRELATION_ID => $incoming->getCorrelationId(), + //AMQPIncomingMessage::COUNT => $incoming->getCount(), + //AMQPIncomingMessage::CONSUME_BODY => $incoming->getConsumeBody(), + //AMQPIncomingMessage::CONSUMER_TAG => $incoming->getConsumeTagName(), + AMQPIncomingMessage::DELIVERY_TAG => $incoming->getDeliveryTag(), + AMQPIncomingMessage::DELIVERY_MODE => $incoming->getDeliveryMode(), + AMQPIncomingMessage::EXCHANGE => $incoming->getExchangeName(), + AMQPIncomingMessage::EXPIRATION => $incoming->getExpiration(), + AMQPIncomingMessage::MESSAGE_ID => $incoming->getMessageId(), + AMQPIncomingMessage::PRIORITY => $incoming->getPriority(), + AMQPIncomingMessage::REPLY_TO => $incoming->getReplyTo(), + AMQPIncomingMessage::REDELIVERED => $incoming->isRedelivery(), + AMQPIncomingMessage::PRIORITY => $incoming->getPriority(), + AMQPIncomingMessage::ROUTING_KEY => $incoming->getRoutingKey(), + AMQPIncomingMessage::TIMESTAMP => $incoming->getTimeStamp(), + AMQPIncomingMessage::TYPE => $incoming->getType(), + AMQPIncomingMessage::USER_ID => $incoming->getUserId() + ); + + return AMQPIncomingMessage::spawn($data); + } + +} + +?> \ No newline at end of file diff --git a/main/Utils/AMQP/Pecl/AMQPPeclQueueConsumer.class.php b/main/Utils/AMQP/Pecl/AMQPPeclQueueConsumer.class.php new file mode 100644 index 0000000000..ab03fc3ef0 --- /dev/null +++ b/main/Utils/AMQP/Pecl/AMQPPeclQueueConsumer.class.php @@ -0,0 +1,66 @@ +cancel = ($cancel === true); + return $this; + } + + /** + * @param int $limit + * @return AMQPPeclQueueConsumer + */ + public function setLimit($limit) + { + $this->limit = $limit; + return $this; + } + + /** + * @return int + */ + public function getCount() + { + return $this->count; + } + + public function handlePeclDelivery(AMQPEnvelope $delivery, AMQPQueue $queue = null) + { + $this->count++; + + if ($this->limit && $this->count >= $this->limit) + $this->setCancel(true); + + return $this->handleDelivery( + AMQPPeclIncomingMessageAdapter::convert($delivery) + ); + } + + public function handleDelivery(AMQPIncomingMessage $delivery) + { + if ($this->cancel) { + $this->handleCancelOk(''); + return false; + } + } + } +?> \ No newline at end of file diff --git a/test/main/Utils/AMQP/AMQPPeclTest.class.php b/test/main/Utils/AMQP/AMQPPeclTest.class.php old mode 100644 new mode 100755 index 2992b2fc7c..49662d0405 --- a/test/main/Utils/AMQP/AMQPPeclTest.class.php +++ b/test/main/Utils/AMQP/AMQPPeclTest.class.php @@ -9,7 +9,7 @@ * * ***************************************************************************/ - class AMQPTestCaseQueueConsumer extends AMQPQueueConsumer + class AMQPTestCaseNoAckQueueConsumer extends AMQPPeclQueueConsumer { protected $checkString = ''; @@ -21,11 +21,55 @@ public function handleCancelOk($consumerTag) public function handleConsumeOk($consumerTag) { $this->checkString .= 'A'; + + AMQPPeclTest::checkMessageCount($this->getChannel()); } public function handleDelivery(AMQPIncomingMessage $delivery) { - $this->checkString .= 'B'; + AMQPPeclTest::messageTest($delivery, $this->count); + + //send acknowledge to RabbitMQ + $this->getChannel()->basicAck( + $delivery->getDeliveryTag(), + true + ); + + return parent::handleDelivery($delivery); + } + + public function getCheckString() + { + return $this->checkString; + } + + public function handleChangeConsumerTag($fromTag, $toTag) + { + return; + } + } + + class AMQPTestCaseAutoAckQueueConsumer extends AMQPPeclQueueConsumer + { + protected $checkString = ''; + + public function handleCancelOk($consumerTag) + { + $this->checkString .= 'C'; + } + + public function handleConsumeOk($consumerTag) + { + $this->checkString .= 'A'; + + AMQPPeclTest::checkMessageCount($this->getChannel()); + } + + public function handleDelivery(AMQPIncomingMessage $delivery) + { + AMQPPeclTest::messageTest($delivery, $this->count); + + return parent::handleDelivery($delivery); } public function getCheckString() @@ -41,10 +85,40 @@ public function handleChangeConsumerTag($fromTag, $toTag) class AMQPPeclTest extends TestCase { - const EXCHANGE_NAME = 'AMQPPeclTestExchange'; - const QUEUE_NAME = 'AMQPPeclTestQueue'; const COUNT_OF_PUBLISH = 5; - const ROUTING_KEY = 'routing.key'; + const MESSAGE_COUNT_WAIT = 200000; + + /** + * cluster master-slave of 2 nodes on single machine + */ + const PORT_MIRRORED = 5673; // port of slave node + + protected static $queueList = array( + // basic queue + 'basic' => array( + 'exchange' => 'AMQPPeclTestExchange', + 'exchangeType' => AMQPExchangeType::DIRECT, + 'name' => 'AMQPPeclTestQueue', + 'key' => 'routing.key', + 'args' => array() + ), + // exchange2exchange binding + 'exchangeBinded' => array( + 'exchange' => 'AMQPPeclTestExchangeBinded', + 'exchangeType' => AMQPExchangeType::FANOUT, + 'name' => 'AMQPPeclTestQueueBinded', + 'key' => 'routing.key.binded', + 'args' => array() + ), + // Highly Available Queues + 'mirrored' => array( + 'exchange' => 'AMQPPeclTestExchange', + 'exchangeType' => AMQPExchangeType::DIRECT, + 'name' => 'AMQPPeclTestQueueMirrored', + 'key' => 'routing.key.mirrored', + 'args' => array('x-ha-policy' => 'all') + ) + ); protected function setUp() { @@ -55,6 +129,36 @@ protected function setUp() } } + public static function messageTest(AMQPIncomingMessage $mess, $i) + { + self::messageAssertion($mess, $i); + } + + + /** + * @param AMQPPeclChannel $channel + * @param string $label + * @param int $value + */ + public static function checkMessageCount(AMQPChannelInterface $channel, + $label = 'basic', $value = self::COUNT_OF_PUBLISH + ) { + usleep(self::MESSAGE_COUNT_WAIT); + + self::assertTrue(isset(self::$queueList[$label])); + + $count = $channel->queueDeclare( + self::$queueList[$label]['name'], + AMQPQueueConfig::create()-> + setDurable(true)-> + setArguments( + self::$queueList[$label]['args'] + ) + ); + + self::assertEquals($value, $count); + } + public function testDefaulConnection() { try { @@ -134,17 +238,7 @@ public function testDeclareExchange() $channel = $c->createChannel(1); try { - $channel = $channel->exchangeDeclare( - self::EXCHANGE_NAME, - AMQPExchangeConfig::create()-> - setType( - new AMQPExchangeType(AMQPExchangeType::DIRECT) - )-> - setDurable(true) - ); - - $this->assertInstanceOf('AMQPChannelInterface', $channel); - + $this->exchangeDeclare($channel, 'basic'); } catch (Exception $e) { $this->fail($e->getMessage()); } @@ -159,11 +253,7 @@ public function testDeclareQueue() $channel = $c->createChannel(1); try { - $int = $channel->queueDeclare( - self::QUEUE_NAME, - AMQPQueueConfig::create()-> - setDurable(true) - ); + $int = $this->queueDeclare($channel, 'basic'); $this->assertSame($int, 0); @@ -180,51 +270,44 @@ public function testProducerLogic() $channel = $c->createChannel(1); - $channel->exchangeDeclare( - self::EXCHANGE_NAME, - AMQPExchangeConfig::create()-> - setType( - new AMQPExchangeType(AMQPExchangeType::DIRECT) - )-> - setDurable(true) - ); + $this->exchangeDeclare($channel, 'basic'); + $this->queueDeclare($channel, 'basic'); + $this->queueBind($channel, 'basic'); + $this->queuePurge($channel, 'basic'); + $this->publishMessages($channel); - $channel->queueDeclare( - self::QUEUE_NAME, - AMQPQueueConfig::create()-> - setDurable(true) - ); + } - $channelInterface = $channel->queueBind( - self::QUEUE_NAME, self::EXCHANGE_NAME, self::ROUTING_KEY + /** + * @depends testProducerLogic + **/ + public function testNoAckConsumerLogic() + { + $c = new AMQPPecl( + AMQPCredentials::createDefault() ); - $this->assertInstanceOf('AMQPChannelInterface', $channelInterface); - //cleanup queue - $channelInterface = $channel->queuePurge(self::QUEUE_NAME); - $this->assertInstanceOf('AMQPChannelInterface', $channelInterface); + $channel = $c->createChannel(1); - for($i = 1; $i <= self::COUNT_OF_PUBLISH; $i++) { - $channelInterface = $channel->basicPublish( - self::EXCHANGE_NAME, - self::ROUTING_KEY, - AMQPOutgoingMessage::create()-> - setBody("message {$i}")-> - setTimestamp(Timestamp::makeNow())-> - setAppId(__CLASS__)-> - setMessageId($i)-> - setContentEncoding('utf-8') - ); + $this->checkMessageCount($channel); - $this->assertInstanceOf( - 'AMQPChannelInterface', - $channelInterface + for ($j = 1; $j <= self::COUNT_OF_PUBLISH; $j++) { + $mess = $channel->basicGet( + self::$queueList['basic']['name'], + false ); + self::messageTest($mess, $j); } + + $this->assertSame(self::COUNT_OF_PUBLISH, $j - 1); + + $this->checkMessageCount($channel, 'basic', 0); + + $c->disconnect(); } /** - * @depends testProducerLogic + * @depends testNoAckConsumerLogic **/ public function testConsumerLogic() { @@ -234,65 +317,140 @@ public function testConsumerLogic() $channel = $c->createChannel(1); - //only required to verify the number of messages in the queue - $int = $channel->queueDeclare( - self::QUEUE_NAME, - AMQPQueueConfig::create()-> - setDurable(true) - ); - - $this->assertSame(self::COUNT_OF_PUBLISH, $int); + $this->checkMessageCount($channel); $i = 0; try { - while($mess = $channel->basicGet(self::QUEUE_NAME)) { - $i++; + while($mess = $channel->basicGet(self::$queueList['basic']['name'])) + self::messageTest($mess, ++$i); + } catch (ObjectNotFoundException $e) { + //it's ok, because queue is empty + $this->assertSame(self::COUNT_OF_PUBLISH, $i); + } + } - $this->assertInstanceOf('AMQPIncomingMessage', $mess); - $this->assertInstanceOf('Timestamp', $mess->getTimestamp()); + /** + * test connection on drop node + */ + public function testDeclareQueueCluster() + { + $c = AMQPSelective::me()-> + addLink( + 'slave', + new AMQPPecl( + AMQPCredentials::createDefault()-> + setPort(self::PORT_MIRRORED) + ) + )-> + addLink( + 'master', + new AMQPPecl( + AMQPCredentials::createDefault() + ) + )-> + setCurrent('slave'); + + $c->dropLink('slave'); - $properties = $mess->getProperties(); + $channel = $c->createChannel(1); - $this->assertEquals(__CLASS__, $mess->getAppId()); - $this->assertTrue( - isset($properties[AMQPIncomingMessage::APP_ID]) - && $properties[AMQPIncomingMessage::APP_ID] == - $mess->getAppId() - ); + AMQPPeclTest::assertEquals( + AMQPCredentials::DEFAULT_PORT, + $c->getCredentials()->getPort() + ); - $this->assertEquals($i, $mess->getMessageId()); - $this->assertTrue( - isset($properties[AMQPIncomingMessage::MESSAGE_ID]) - && $properties[AMQPIncomingMessage::MESSAGE_ID] == - $mess->getMessageId() - ); + try { + $int = $this->queueDeclare($channel, 'mirrored'); + $this->assertSame($int, 0); + } catch (Exception $e) { + $this->fail($e->getMessage()); + } + } + public function testProducerLogicMirrored() + { + $c = AMQPSelective::me()-> + addLink( + 'slave', + new AMQPPecl( + AMQPCredentials::createDefault()-> + setPort(self::PORT_MIRRORED) + ) + )-> + addLink( + 'master', + new AMQPPecl( + AMQPCredentials::createDefault() + ) + )-> + setCurrent('slave'); - $this->assertEquals('text/plain', $mess->getContentType()); - $this->assertTrue( - isset($properties[AMQPIncomingMessage::CONTENT_TYPE]) - && $properties[AMQPIncomingMessage::CONTENT_TYPE] == - $mess->getContentType() - ); + $channel = $c->createChannel(1); + + $this->exchangeDeclare($channel, 'mirrored'); + $this->queueDeclare($channel, 'mirrored'); + $this->queueBind($channel, 'mirrored'); + $this->queuePurge($channel, 'mirrored'); + + AMQPPeclTest::assertEquals( + self::PORT_MIRRORED, + $c->getCredentials()->getPort() + ); - $this->assertEquals('utf-8', $mess->getContentEncoding()); - $this->assertTrue( - isset($properties[AMQPIncomingMessage::CONTENT_ENCODING]) - && $properties[AMQPIncomingMessage::CONTENT_ENCODING] == - $mess->getContentEncoding() - ); + $c->dropLink('slave'); + + $this->publishMessages($channel, false, 'mirrored'); + + AMQPPeclTest::assertEquals( + AMQPCredentials::DEFAULT_PORT, + $c->getCredentials()->getPort() + ); + + $this->checkMessageCount($channel, 'mirrored'); - $this->assertEquals("message {$i}", $mess->getBody()); - } - } catch (ObjectNotFoundException $e) { - //it's ok, because queue is empty - $this->assertSame(self::COUNT_OF_PUBLISH, $i); - } } /** - * @depends testProducerLogic + * @depends testProducerLogicMirrored **/ + public function testConsumerLogicMirrored() + { + $c = AMQPSelective::me()-> + addLink( + 'slave', + new AMQPPecl( + AMQPCredentials::createDefault()-> + setPort(self::PORT_MIRRORED) + ) + )-> + addLink( + 'master', + new AMQPPecl( + AMQPCredentials::createDefault() + ) + )-> + setCurrent('slave'); + + $c->dropLink('slave'); + + $channel = $c->createChannel(1); + + $this->checkMessageCount($channel, 'mirrored'); + + $i = 0; + try { + while($mess = $channel->basicGet(self::$queueList['mirrored']['name'])) + self::messageTest($mess, ++$i); + } catch (ObjectNotFoundException $e) {/**/} + $this->assertSame(self::COUNT_OF_PUBLISH, $i); + + AMQPPeclTest::assertEquals( + AMQPCredentials::DEFAULT_PORT, + $c->getCredentials()->getPort() + ); + + } + public function testCleanup() { $c = new AMQPPecl( @@ -301,66 +459,135 @@ public function testCleanup() $channel = $c->createChannel(1); - $channelInterface = $channel->queueUnbind( - self::QUEUE_NAME, - self::EXCHANGE_NAME, - self::ROUTING_KEY - ); + foreach (array('basic', 'mirrored') as $label) { + $this->exchangeDeclare($channel, $label); + $this->queueDeclare($channel, $label); + $this->queueBind($channel, $label); + $this->queueUnbind($channel, $label); + $this->queueDelete($channel, $label); + $this->exchangeDelete($channel, $label); + } + } - $this->assertInstanceOf( - 'AMQPChannelInterface', - $channelInterface - ); - $channelInterface = $channel->queueDelete(self::QUEUE_NAME); - $this->assertInstanceOf( - 'AMQPChannelInterface', - $channelInterface + public function testQueueConsumerNoAck() + { + $c = new AMQPPecl( + AMQPCredentials::createDefault() ); - $channelInterface = $channel->exchangeDelete(self::EXCHANGE_NAME); - $this->assertInstanceOf( - 'AMQPChannelInterface', - $channelInterface + $channel = $c->createChannel(1); + + $this->exchangeDeclare($channel, 'basic'); + $this->queueDeclare($channel, 'basic'); + $this->queueBind($channel, 'basic'); + $this->queuePurge($channel, 'basic'); + + $this->publishMessages($channel, false); + + $consumer = new AMQPTestCaseNoAckQueueConsumer($channel); + $consumer->setLimit(AMQPPeclTest::COUNT_OF_PUBLISH); + + $channel->basicConsume( + self::$queueList['basic']['name'], + false, + $consumer ); + $this->assertSame('AC', $consumer->getCheckString()); + $this->assertEquals(self::COUNT_OF_PUBLISH, $consumer->getCount()); + + /** + * check queue is empty + */ + //drop channels and close connection + $c->disconnect(); + + $c = new AMQPPecl(AMQPCredentials::createDefault()); + $channel = $c->createChannel(1); + + $this->exchangeDeclare($channel, 'basic'); + $inQueueCount = $this->queueDeclare($channel, 'basic'); + $this->assertSame(0, $inQueueCount); } - public function testQueueConsumer() + public function testQueueConsumerAutoAck() { $c = new AMQPPecl( AMQPCredentials::createDefault() ); $channel = $c->createChannel(1); + $this->exchangeDeclare($channel, 'basic'); + $inQueueCount = $this->queueDeclare($channel, 'basic'); + $this->assertSame(0, $inQueueCount); + $this->queueBind($channel, 'basic'); + $this->queuePurge($channel, 'basic'); - $channel->exchangeDeclare( - self::EXCHANGE_NAME, - AMQPExchangeConfig::create()-> - setType( - new AMQPExchangeType(AMQPExchangeType::DIRECT) - )-> - setDurable(true) - ); + $this->publishMessages($channel, false); - $inQueueCount = $channel->queueDeclare( - self::QUEUE_NAME, - AMQPQueueConfig::create()-> - setDurable(true) + $consumer = new AMQPTestCaseAutoAckQueueConsumer($channel); + $consumer->setLimit(AMQPPeclTest::COUNT_OF_PUBLISH); + + $channel->basicConsume( + self::$queueList['basic']['name'], + true, + $consumer ); + //observer logic test + $this->assertSame('AC', $consumer->getCheckString()); + $this->assertEquals(self::COUNT_OF_PUBLISH, $consumer->getCount()); + + //drop channels and close connection + $c->disconnect(); + + $c = new AMQPPecl(AMQPCredentials::createDefault()); + $channel = $c->createChannel(1); + + $this->exchangeDeclare($channel, 'basic'); + $inQueueCount = $this->queueDeclare($channel, 'basic'); $this->assertSame(0, $inQueueCount); + } - $channel->queueBind( - self::QUEUE_NAME, self::EXCHANGE_NAME, self::ROUTING_KEY + + public function testExchangeToExchangeProducerLogic() + { + $this->exchangeToExchangeCleanup(); + + $c = new AMQPPecl( + AMQPCredentials::createDefault() ); - //cleanup - $channel->queuePurge(self::QUEUE_NAME); - + $channel = $c->createChannel(1); + + $this->exchangeDeclare($channel, 'basic'); + $this->queueDeclare($channel, 'basic'); + $this->exchangeDeclare($channel, 'exchangeBinded'); + $this->queueDeclare($channel, 'exchangeBinded'); + $this->queuePurge($channel, 'basic'); + $this->queuePurge($channel, 'exchangeBinded'); + $this->queueBind($channel, 'basic'); + $this->queueBind($channel, 'exchangeBinded'); + + /** + * binding exchangeBinded-exchange to basic-exchange + * by basic-routing key + */ + $channelInterface = $channel->exchangeBind( + self::$queueList['basic']['exchange'], + self::$queueList['exchangeBinded']['exchange'], + self::$queueList['basic']['key'] + ); + $this->assertInstanceOf('AMQPChannelInterface', $channelInterface); + + /** + * publish messages to 2 queues throw exchangeBinded-exchange + * with basic-key + */ for($i = 1; $i <= self::COUNT_OF_PUBLISH; $i++) { - $channel->basicPublish( - self::EXCHANGE_NAME, - self::ROUTING_KEY, + $channelInterface = $channel->basicPublish( + self::$queueList['exchangeBinded']['exchange'], + self::$queueList['basic']['key'], AMQPOutgoingMessage::create()-> setBody("message {$i}")-> setTimestamp(Timestamp::makeNow())-> @@ -368,84 +595,317 @@ public function testQueueConsumer() setMessageId($i)-> setContentEncoding('utf-8') ); + + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); } - $consumer = new AMQPTestCaseQueueConsumer($channel); + // message count in basic-queue + $this->checkMessageCount($channel); - $channel->basicConsume( - self::QUEUE_NAME, - false, - $consumer - ); + // message count in exchangeBinded-queue + $this->checkMessageCount($channel, 'exchangeBinded'); + } - $i = 0; + /** + * @depends testExchangeToExchangeProducerLogic + **/ + public function testExchangeToExchangeConsumerLogic() + { + $c = new AMQPPecl( + AMQPCredentials::createDefault() + ); - while (true) { - $message = $consumer->getNextDelivery(); + $channel = $c->createChannel(1); - //send acknowledge to RabbitMQ - $channel->basicAck( - $message->getDeliveryTag(), - false + $names = array( + self::$queueList['basic']['name'], + self::$queueList['exchangeBinded']['name'] + ); + foreach ($names as $name) { + $i = 0; + try { + while($mess = $channel->basicGet($name)) + self::messageTest($mess, ++$i); + } catch (ObjectNotFoundException $e) {/**/} + + $this->assertSame( + self::COUNT_OF_PUBLISH, + $i, + "message count={$i} in {$name}-queue, must be equal to " + .self::COUNT_OF_PUBLISH ); + } + } - $i++; + public function testExchangeToExchangeCleanup() + { + $this->exchangeToExchangeCleanup(); + } - $this->assertSame( - $consumer->getConsumerTag(), - $message->getConsumerTag() - ); + protected function exchangeToExchangeCleanup() + { + $c = new AMQPPecl( + AMQPCredentials::createDefault() + ); - $this->assertEquals( - $i, - $message->getMessageId() - ); + $channel = $c->createChannel(1); - $this->assertEquals( - __CLASS__, - $message->getAppId() - ); + $this->exchangeDeclare($channel, 'basic'); + $this->exchangeDeclare($channel, 'exchangeBinded'); + $this->queueDeclare($channel, 'exchangeBinded'); + $this->queueDeclare($channel, 'basic'); + $this->queueBind($channel, 'basic'); + $this->queueBind($channel, 'exchangeBinded'); + + $channel->queueBind( + self::$queueList['basic']['name'], + self::$queueList['exchangeBinded']['exchange'], + self::$queueList['basic']['key'] + ); + + $this->queueUnbind($channel, 'basic'); + $this->queueUnbind($channel, 'exchangeBinded'); - $this->assertEquals( - "message {$i}", - $message->getBody() + $channelInterface = $channel->queueUnbind( + self::$queueList['basic']['name'], + self::$queueList['exchangeBinded']['exchange'], + self::$queueList['basic']['key'] + ); + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); + $this->queueDelete($channel, 'exchangeBinded'); + $this->queueDelete($channel, 'basic'); + + $this->exchangeDelete($channel, 'basic'); + $this->exchangeDelete($channel, 'exchangeBinded'); + } + + /** + * @param AMQPChannelInterface $channel + * @param bool $check + * @param string $key + * @param string $queueName + */ + protected function publishMessages(AMQPChannelInterface $channel, $check = true, + $label = 'basic' + ) { + for($i = 1; $i <= self::COUNT_OF_PUBLISH; $i++) { + $channelInterface = $channel->basicPublish( + self::$queueList[$label]['exchange'], + self::$queueList[$label]['key'], + AMQPOutgoingMessage::create()-> + setBody("message {$i}")-> + setTimestamp(Timestamp::makeNow())-> + setAppId(__CLASS__)-> + setMessageId($i)-> + setContentEncoding('utf-8') ); - if ($i == self::COUNT_OF_PUBLISH) - break; + if ($check) + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); } - $channel->basicCancel($consumer->getConsumerTag()); + if ($check) + $this->checkMessageCount($channel, $label); + } - //observer logic test - $this->assertSame('ABBBBBC', $consumer->getCheckString()); + protected static function messageAssertion(AMQPIncomingMessage $mess, $i) + { + self::assertInstanceOf('AMQPIncomingMessage', $mess); + self::assertInstanceOf('Timestamp', $mess->getTimestamp()); + self::assertTrue(strlen(trim($mess->getDeliveryTag())) > 0); - //drop channels and close connection - $c->disconnect(); + $properties = $mess->getProperties(); - $c = new AMQPPecl(AMQPCredentials::createDefault()); - $channel = $c->createChannel(1); + //self::assertEquals('guest', $mess->getUserId()); + self::assertTrue( + isset($properties[AMQPIncomingMessage::USER_ID]) + && $properties[AMQPIncomingMessage::USER_ID] == + $mess->getUserId() + ); + + self::assertEquals(__CLASS__, $mess->getAppId()); + self::assertTrue( + isset($properties[AMQPIncomingMessage::APP_ID]) + && $properties[AMQPIncomingMessage::APP_ID] == + $mess->getAppId() + ); + + self::assertEquals($i, $mess->getMessageId()); + self::assertTrue( + isset($properties[AMQPIncomingMessage::MESSAGE_ID]) + && $properties[AMQPIncomingMessage::MESSAGE_ID] == + $mess->getMessageId() + ); + + + self::assertEquals('text/plain', $mess->getContentType()); + self::assertTrue( + isset($properties[AMQPIncomingMessage::CONTENT_TYPE]) + && $properties[AMQPIncomingMessage::CONTENT_TYPE] == + $mess->getContentType() + ); - $channel->exchangeDeclare( - self::EXCHANGE_NAME, + self::assertEquals('utf-8', $mess->getContentEncoding()); + self::assertTrue( + isset($properties[AMQPIncomingMessage::CONTENT_ENCODING]) + && $properties[AMQPIncomingMessage::CONTENT_ENCODING] == + $mess->getContentEncoding() + ); + + self::assertEquals("message {$i}", $mess->getBody()); + } + + /** + * @param AMQPPeclChannel $channel + * @param AMQPPeclChannel $label + * @return AMQPPeclChannel + */ + protected function exchangeDeclare(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $interface = $channel->exchangeDeclare( + self::$queueList[$label]['exchange'], AMQPExchangeConfig::create()-> setType( - new AMQPExchangeType(AMQPExchangeType::DIRECT) + new AMQPExchangeType(self::$queueList[$label]['exchangeType']) )-> setDurable(true) ); - //queue must be empty, because we sent acknowledge - $inQueueCount = $channel->queueDeclare( - self::QUEUE_NAME, + $this->assertInstanceOf('AMQPChannelInterface', $interface); + + return $interface; + } + + /** + * @param AMQPChannelInterface $channel + * @param string $label + * @return AMQPChannelInterface + */ + protected function exchangeDelete(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $channelInterface = $channel->exchangeDelete( + self::$queueList[$label]['exchange'] + ); + + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); + + return $channelInterface; + } + + /** + * @param AMQPChannelInterface $channel + * @param string $label + * @return int + */ + protected function queueDeclare(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + return $channel->queueDeclare( + self::$queueList[$label]['name'], AMQPQueueConfig::create()-> - setDurable(true) + setDurable(true)-> + setArguments( + self::$queueList[$label]['args'] + ) ); + } - $this->assertSame(0, $inQueueCount); + /** + * @param AMQPChannelInterface $channel + * @param string $label + * @return AMQPChannelInterface + */ + protected function queueBind(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $channelInterface = $channel->queueBind( + self::$queueList[$label]['name'], + self::$queueList[$label]['exchange'], + self::$queueList[$label]['key'] + ); + + $this->assertInstanceOf('AMQPChannelInterface', $channelInterface); + + return $channelInterface; + } - //cleanup - $channel->queuePurge(self::QUEUE_NAME); + /** + * @param AMQPChannelInterface $channel + * @param string $label + * @return AMQPChannelInterface + */ + protected function queuePurge(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $channelInterface = $channel->queuePurge(self::$queueList[$label]['name']); + + $this->assertInstanceOf('AMQPChannelInterface', $channelInterface); + + return $channelInterface; } + + /** + * @param AMQPChannelInterface $channel + * @param AMQPPeclChannel $label + * @return AMQPChannelInterface + */ + protected function queueUnbind(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $channelInterface = $channel->queueUnbind( + self::$queueList[$label]['name'], + self::$queueList[$label]['exchange'], + self::$queueList[$label]['key'] + ); + + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); + + return $channelInterface; + } + + /** + * @param AMQPChannelInterface $channel + * @param string $label + * @return AMQPChannelInterface + */ + protected function queueDelete(AMQPChannelInterface $channel, $label) + { + $this->assertTrue(isset(self::$queueList[$label])); + + $channelInterface = $channel->queueDelete( + self::$queueList[$label]['name'] + ); + + $this->assertInstanceOf( + 'AMQPChannelInterface', + $channelInterface + ); + + return $channelInterface; + } + } ?> \ No newline at end of file