Skip to content

Commit 3435039

Browse files
Add events for before and after producer publishes a message
1 parent 8c18f6d commit 3435039

File tree

4 files changed

+123
-1
lines changed

4 files changed

+123
-1
lines changed

Event/AMQPEvent.php

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
namespace OldSound\RabbitMqBundle\Event;
44

55
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
use OldSound\RabbitMqBundle\RabbitMq\Producer;
67
use PhpAmqpLib\Message\AMQPMessage;
7-
use Symfony\Component\EventDispatcher\Event;
88

99
/**
1010
* Class AMQPEvent
@@ -18,6 +18,8 @@ class AMQPEvent extends AbstractAMQPEvent
1818
public const ON_IDLE = 'on_idle';
1919
public const BEFORE_PROCESSING_MESSAGE = 'before_processing';
2020
public const AFTER_PROCESSING_MESSAGE = 'after_processing';
21+
public const BEFORE_PUBLISH_MESSAGE = 'before_publishing';
22+
public const AFTER_PUBLISH_MESSAGE = 'after_publishing';
2123

2224
/**
2325
* @var AMQPMessage
@@ -29,6 +31,11 @@ class AMQPEvent extends AbstractAMQPEvent
2931
*/
3032
protected $consumer;
3133

34+
/**
35+
* @var Producer
36+
*/
37+
protected $producer;
38+
3239
/**
3340
* @return AMQPMessage
3441
*/
@@ -68,4 +75,24 @@ public function setConsumer(Consumer $consumer)
6875

6976
return $this;
7077
}
78+
79+
/**
80+
* @return Producer
81+
*/
82+
public function getProducer()
83+
{
84+
return $this->producer;
85+
}
86+
87+
/**
88+
* @param Producer $producer
89+
*
90+
* @return AMQPEvent
91+
*/
92+
public function setProducer(Producer $producer)
93+
{
94+
$this->producer = $producer;
95+
96+
return $this;
97+
}
7198
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Producer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class AfterProducerPublishMessageEvent
10+
*
11+
* @package OldSound\RabbitMqBundle\Command
12+
*/
13+
class AfterProducerPublishMessageEvent extends AMQPEvent
14+
{
15+
public const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
16+
17+
/**
18+
* @var string
19+
*/
20+
protected $routingKey;
21+
22+
/**
23+
* AfterProducerPublishMessageEvent constructor.
24+
*
25+
* @param AMQPMessage $AMQPMessage
26+
*/
27+
public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string $routingKey)
28+
{
29+
$this->setProducer($producer);
30+
$this->setAMQPMessage($AMQPMessage);
31+
$this->routingKey = $routingKey;
32+
}
33+
34+
/**
35+
* @return AMQPMessage
36+
*/
37+
public function getRoutingKey()
38+
{
39+
return $this->routingKey;
40+
}
41+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Producer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class BeforeProducerPublishMessageEvent
10+
*
11+
* @package OldSound\RabbitMqBundle\Command
12+
*/
13+
class BeforeProducerPublishMessageEvent extends AMQPEvent
14+
{
15+
public const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
16+
17+
/**
18+
* @var string
19+
*/
20+
protected $routingKey;
21+
22+
/**
23+
* BeforeProducerPublishMessageEvent constructor.
24+
*
25+
* @param AMQPMessage $AMQPMessage
26+
*/
27+
public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string $routingKey)
28+
{
29+
$this->setProducer($producer);
30+
$this->setAMQPMessage($AMQPMessage);
31+
$this->routingKey = $routingKey;
32+
}
33+
34+
/**
35+
* @return AMQPMessage
36+
*/
37+
public function getRoutingKey()
38+
{
39+
return $this->routingKey;
40+
}
41+
}

RabbitMq/Producer.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent;
6+
use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent;
57
use PhpAmqpLib\Message\AMQPMessage;
68
use PhpAmqpLib\Wire\AMQPTable;
79

@@ -63,6 +65,12 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = []
6365
}
6466

6567
$real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey;
68+
69+
$this->dispatchEvent(
70+
BeforeProducerPublishMessageEvent::NAME,
71+
new BeforeProducerPublishMessageEvent($this, $msg, $real_routingKey)
72+
);
73+
6674
$this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey);
6775
$this->logger->debug('AMQP message published', [
6876
'amqp' => [
@@ -72,5 +80,10 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = []
7280
'headers' => $headers,
7381
],
7482
]);
83+
84+
$this->dispatchEvent(
85+
AfterProducerPublishMessageEvent::NAME,
86+
new AfterProducerPublishMessageEvent($this, $msg, $real_routingKey)
87+
);
7588
}
7689
}

0 commit comments

Comments
 (0)