Skip to content

Commit 6a08062

Browse files
Nyholmfabpot
authored andcommitted
[Messenger] Move Transports to separate packages
0 parents  commit 6a08062

25 files changed

+2447
-0
lines changed

.gitattributes

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/Tests export-ignore
2+
/phpunit.xml.dist export-ignore
3+
/.gitignore export-ignore

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
vendor/
2+
composer.lock
3+
phpunit.xml

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CHANGELOG
2+
=========
3+
4+
5.1.0
5+
-----
6+
7+
* Introduced the AMQP bridge.

LICENSE

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2018-2020 Fabien Potencier
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is furnished
8+
to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
AMQP Messenger
2+
==============
3+
4+
Provides AMQP integration for Symfony Messenger.
5+
6+
Resources
7+
---------
8+
9+
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
10+
* [Report issues](https://github.com/symfony/symfony/issues) and
11+
[send Pull Requests](https://github.com/symfony/symfony/pulls)
12+
in the [main Symfony repository](https://github.com/symfony/symfony)

Tests/Fixtures/DummyMessage.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures;
4+
5+
class DummyMessage
6+
{
7+
private $message;
8+
9+
public function __construct(string $message)
10+
{
11+
$this->message = $message;
12+
}
13+
14+
public function getMessage(): string
15+
{
16+
return $this->message;
17+
}
18+
}

Tests/Fixtures/long_receiver.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
$componentRoot = $_SERVER['COMPONENT_ROOT'];
4+
5+
if (!is_file($autoload = $componentRoot.'/vendor/autoload.php')) {
6+
$autoload = $componentRoot.'/../../../../../../vendor/autoload.php';
7+
}
8+
9+
if (!file_exists($autoload)) {
10+
exit('You should run "composer install --dev" in the component before running this script.');
11+
}
12+
13+
require_once $autoload;
14+
15+
use Symfony\Component\EventDispatcher\EventDispatcher;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
18+
use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
19+
use Symfony\Component\Messenger\MessageBusInterface;
20+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
21+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
22+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
23+
use Symfony\Component\Messenger\Worker;
24+
use Symfony\Component\Serializer as SerializerComponent;
25+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
26+
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
27+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
28+
29+
$serializer = new Serializer(
30+
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
31+
);
32+
33+
$connection = Connection::fromDsn(getenv('DSN'));
34+
$receiver = new AmqpReceiver($connection, $serializer);
35+
$eventDispatcher = new EventDispatcher();
36+
$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener());
37+
$eventDispatcher->addSubscriber(new DispatchPcntlSignalListener());
38+
39+
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
40+
public function dispatch($envelope, array $stamps = []): Envelope
41+
{
42+
echo 'Get envelope with message: '.get_class($envelope->getMessage())."\n";
43+
echo sprintf("with stamps: %s\n", json_encode(array_keys($envelope->all()), JSON_PRETTY_PRINT));
44+
45+
sleep(30);
46+
echo "Done.\n";
47+
48+
return $envelope;
49+
}
50+
}, $eventDispatcher);
51+
52+
echo "Receiving messages...\n";
53+
$worker->run();
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Transport;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
17+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
18+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
19+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
20+
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
21+
use Symfony\Component\Messenger\Envelope;
22+
use Symfony\Component\Messenger\Stamp\DelayStamp;
23+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
24+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
25+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
26+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
27+
use Symfony\Component\Process\PhpProcess;
28+
use Symfony\Component\Process\Process;
29+
use Symfony\Component\Serializer as SerializerComponent;
30+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
31+
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
32+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
33+
34+
/**
35+
* @requires extension amqp
36+
*/
37+
class AmqpExtIntegrationTest extends TestCase
38+
{
39+
protected function setUp(): void
40+
{
41+
parent::setUp();
42+
43+
if (!getenv('MESSENGER_AMQP_DSN')) {
44+
$this->markTestSkipped('The "MESSENGER_AMQP_DSN" environment variable is required.');
45+
}
46+
}
47+
48+
public function testItSendsAndReceivesMessages()
49+
{
50+
$serializer = $this->createSerializer();
51+
52+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
53+
$connection->setup();
54+
$connection->purgeQueues();
55+
56+
$sender = new AmqpSender($connection, $serializer);
57+
$receiver = new AmqpReceiver($connection, $serializer);
58+
59+
$sender->send($first = new Envelope(new DummyMessage('First')));
60+
$sender->send($second = new Envelope(new DummyMessage('Second')));
61+
62+
$envelopes = iterator_to_array($receiver->get());
63+
$this->assertCount(1, $envelopes);
64+
/** @var Envelope $envelope */
65+
$envelope = $envelopes[0];
66+
$this->assertEquals($first->getMessage(), $envelope->getMessage());
67+
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
68+
69+
$envelopes = iterator_to_array($receiver->get());
70+
$this->assertCount(1, $envelopes);
71+
/** @var Envelope $envelope */
72+
$envelope = $envelopes[0];
73+
$this->assertEquals($second->getMessage(), $envelope->getMessage());
74+
75+
$this->assertEmpty(iterator_to_array($receiver->get()));
76+
}
77+
78+
public function testRetryAndDelay()
79+
{
80+
$serializer = $this->createSerializer();
81+
82+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
83+
$connection->setup();
84+
$connection->purgeQueues();
85+
86+
$sender = new AmqpSender($connection, $serializer);
87+
$receiver = new AmqpReceiver($connection, $serializer);
88+
89+
// send a first message
90+
$sender->send($first = new Envelope(new DummyMessage('First')));
91+
92+
// receive it immediately and imitate a redeliver with 2 second delay
93+
$envelopes = iterator_to_array($receiver->get());
94+
/** @var Envelope $envelope */
95+
$envelope = $envelopes[0];
96+
$newEnvelope = $envelope
97+
->with(new DelayStamp(2000))
98+
->with(new RedeliveryStamp(1));
99+
$sender->send($newEnvelope);
100+
$receiver->ack($envelope);
101+
102+
// send a 2nd message with a shorter delay and custom routing key
103+
$customRoutingKeyMessage = new DummyMessage('custom routing key');
104+
$envelopeCustomRoutingKey = new Envelope($customRoutingKeyMessage, [
105+
new DelayStamp(1000),
106+
new AmqpStamp('my_custom_routing_key'),
107+
]);
108+
$sender->send($envelopeCustomRoutingKey);
109+
110+
// wait for next message (but max at 3 seconds)
111+
$startTime = microtime(true);
112+
$envelopes = $this->receiveEnvelopes($receiver, 3);
113+
114+
// duration should be about 1 second
115+
$this->assertApproximateDuration($startTime, 1);
116+
117+
// this should be the custom routing key message first
118+
$this->assertCount(1, $envelopes);
119+
/* @var Envelope $envelope */
120+
$receiver->ack($envelopes[0]);
121+
$this->assertEquals($customRoutingKeyMessage, $envelopes[0]->getMessage());
122+
123+
// wait for final message (but max at 3 seconds)
124+
$envelopes = $this->receiveEnvelopes($receiver, 3);
125+
// duration should be about 2 seconds
126+
$this->assertApproximateDuration($startTime, 2);
127+
128+
/* @var RedeliveryStamp|null $retryStamp */
129+
// verify the stamp still exists from the last send
130+
$this->assertCount(1, $envelopes);
131+
$retryStamp = $envelopes[0]->last(RedeliveryStamp::class);
132+
$this->assertNotNull($retryStamp);
133+
$this->assertSame(1, $retryStamp->getRetryCount());
134+
135+
$receiver->ack($envelope);
136+
}
137+
138+
public function testItReceivesSignals()
139+
{
140+
$serializer = $this->createSerializer();
141+
142+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
143+
$connection->setup();
144+
$connection->purgeQueues();
145+
146+
$sender = new AmqpSender($connection, $serializer);
147+
$sender->send(new Envelope(new DummyMessage('Hello')));
148+
149+
$amqpReadTimeout = 30;
150+
$dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout;
151+
$process = new PhpProcess(file_get_contents(__DIR__.'/../Fixtures/long_receiver.php'), null, [
152+
'COMPONENT_ROOT' => __DIR__.'/../../',
153+
'DSN' => $dsn,
154+
]);
155+
156+
$process->start();
157+
158+
$this->waitForOutput($process, $expectedOutput = "Receiving messages...\n");
159+
160+
$signalTime = microtime(true);
161+
$timedOutTime = time() + 10;
162+
163+
// immediately after the process has started "booted", kill it
164+
$process->signal(15);
165+
166+
while ($process->isRunning() && time() < $timedOutTime) {
167+
usleep(100 * 1000); // 100ms
168+
}
169+
170+
// make sure the process exited, after consuming only the 1 message
171+
$this->assertFalse($process->isRunning());
172+
$this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime);
173+
$this->assertSame($expectedOutput.<<<'TXT'
174+
Get envelope with message: Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage
175+
with stamps: [
176+
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
177+
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
178+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
179+
]
180+
Done.
181+
182+
TXT
183+
, $process->getOutput());
184+
}
185+
186+
public function testItCountsMessagesInQueue()
187+
{
188+
$serializer = $this->createSerializer();
189+
190+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'));
191+
$connection->setup();
192+
$connection->purgeQueues();
193+
194+
$sender = new AmqpSender($connection, $serializer);
195+
196+
$sender->send(new Envelope(new DummyMessage('First')));
197+
$sender->send(new Envelope(new DummyMessage('Second')));
198+
$sender->send(new Envelope(new DummyMessage('Third')));
199+
200+
sleep(1); // give amqp a moment to have the messages ready
201+
$this->assertSame(3, $connection->countMessagesInQueues());
202+
}
203+
204+
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
205+
{
206+
$timedOutTime = time() + $timeoutInSeconds;
207+
208+
while (time() < $timedOutTime) {
209+
if (0 === strpos($process->getOutput(), $output)) {
210+
return;
211+
}
212+
213+
usleep(100 * 1000); // 100ms
214+
}
215+
216+
throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.');
217+
}
218+
219+
private function createSerializer(): SerializerInterface
220+
{
221+
return new Serializer(
222+
new SerializerComponent\Serializer([new ObjectNormalizer(), new ArrayDenormalizer()], ['json' => new JsonEncoder()])
223+
);
224+
}
225+
226+
private function assertApproximateDuration($startTime, int $expectedDuration)
227+
{
228+
$actualDuration = microtime(true) - $startTime;
229+
230+
if (method_exists($this, 'assertEqualsWithDelta')) {
231+
$this->assertEqualsWithDelta($expectedDuration, $actualDuration, .5, 'Duration was not within expected range');
232+
} else {
233+
$this->assertEquals($expectedDuration, $actualDuration, 'Duration was not within expected range', .5);
234+
}
235+
}
236+
237+
/**
238+
* @return Envelope[]
239+
*/
240+
private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): array
241+
{
242+
$envelopes = [];
243+
$startTime = microtime(true);
244+
while (0 === \count($envelopes) && $startTime + $timeout > time()) {
245+
$envelopes = iterator_to_array($receiver->get());
246+
}
247+
248+
return $envelopes;
249+
}
250+
}

0 commit comments

Comments
 (0)