Skip to content

Commit b6d8d28

Browse files
committed
* handle the change of СonsumerTag implemented
1 parent fb1c091 commit b6d8d28

File tree

5 files changed

+34
-6
lines changed

5 files changed

+34
-6
lines changed

doc/ChangeLog

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
2011-11-18 Sergey S. Sergeev
2+
3+
* main/Utils/AMQP/AMQPConsumer.class.php,
4+
main/Utils/AMQP/AMQPDefaultConsumer.class.php,
5+
main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php:
6+
handle change СonsumerTag implemented.
7+
18
2011-11-18 Evgeny V. Kokovikhin
29

310
* main/Messages/TextMessage.class.php, main/Messages/TextFileReceiver.class.php:

main/Utils/AMQP/AMQPConsumer.class.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ public function handleConsumeOk($consumerTag);
4141
**/
4242
public function handleCancelOk($consumerTag);
4343

44+
/**
45+
* Called when the consumer is changed tag
46+
*
47+
* @param string $fromTag
48+
* @param string $toTag
49+
* @return void
50+
**/
51+
public function handleChangeConsumerTag($fromTag, $toTag);
52+
4453
/**
4554
* @return AMQPConsumer
4655
**/

main/Utils/AMQP/AMQPDefaultConsumer.class.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public function handleDelivery(AMQPIncomingMessage $delivery)
7272
// no work to do
7373
}
7474

75+
/**
76+
* @return void
77+
**/
78+
public function handleChangeConsumerTag($fromTag, $toTag)
79+
{
80+
// no work to do
81+
}
82+
7583
/**
7684
* @return AMQPDefaultConsumer
7785
**/

main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -485,11 +485,12 @@ public function getNextDelivery()
485485
}
486486

487487
$this->checkCommandResult(
488-
is_array($messages),
488+
is_array($messages) && !empty($messages),
489489
"Could not consume from queue"
490490
);
491491

492492
$message = array_shift($messages);
493+
493494
$incoming = AMQPIncomingMessage::spawn($message);
494495

495496
if ($this->consumer->getConsumerTag() === null) {
@@ -499,11 +500,9 @@ public function getNextDelivery()
499500
$this->consumer->getConsumerTag()
500501
!= $incoming->getConsumerTag()
501502
) {
502-
throw new WrongStateException(
503-
"Consumer change tag consumerTag="
504-
."{$this->consumer->getConsumerTag()}, "
505-
."message.consumerTag={$incoming->getConsumerTag()}, "
506-
."message.body={$incoming->getBody()}"
503+
$this->consumer->handleChangeConsumerTag(
504+
$this->consumer->getConsumerTag(),
505+
$incoming->getConsumerTag()
507506
);
508507
}
509508

test/main/Utils/AMQP/AMQPPeclTest.class.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public function getCheckString()
3232
{
3333
return $this->checkString;
3434
}
35+
36+
public function handleChangeConsumerTag($fromTag, $toTag)
37+
{
38+
return;
39+
}
3540
}
3641

3742
class AMQPPeclTest extends TestCase

0 commit comments

Comments
 (0)