Skip to content

Commit 0c7d2ef

Browse files
committed
[Messenger] Add AMQP adapter
1 parent 0b8321a commit 0c7d2ef

23 files changed

+1175
-86
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface;
15+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16+
use Symfony\Component\Messenger\Transport\SenderInterface;
17+
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
18+
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
19+
20+
/**
21+
* @author Samuel Roze <[email protected]>
22+
*/
23+
class AmqpAdapterFactory implements AdapterFactoryInterface
24+
{
25+
private $encoder;
26+
private $decoder;
27+
private $debug;
28+
29+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
30+
{
31+
$this->encoder = $encoder;
32+
$this->decoder = $decoder;
33+
$this->debug = $debug;
34+
}
35+
36+
public function createReceiver(string $dsn, array $options): ReceiverInterface
37+
{
38+
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
39+
}
40+
41+
public function createSender(string $dsn, array $options): SenderInterface
42+
{
43+
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
44+
}
45+
46+
public function supports(string $dsn, array $options): bool
47+
{
48+
return 0 === strpos($dsn, 'amqp://');
49+
}
50+
}

Adapter/AmqpExt/AmqpFactory.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
class AmqpFactory
15+
{
16+
public function createConnection(array $credentials): \AMQPConnection
17+
{
18+
return new \AMQPConnection($credentials);
19+
}
20+
21+
public function createChannel(\AMQPConnection $connection): \AMQPChannel
22+
{
23+
return new \AMQPChannel($connection);
24+
}
25+
26+
public function createQueue(\AMQPChannel $channel): \AMQPQueue
27+
{
28+
return new \AMQPQueue($channel);
29+
}
30+
31+
public function createExchange(\AMQPChannel $channel): \AMQPExchange
32+
{
33+
return new \AMQPExchange($channel);
34+
}
35+
}

Adapter/AmqpExt/AmqpReceiver.php

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Adapter\AmqpExt\Exception\RejectMessageExceptionInterface;
15+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16+
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
17+
18+
/**
19+
* Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension.
20+
*
21+
* @author Samuel Roze <[email protected]>
22+
*/
23+
class AmqpReceiver implements ReceiverInterface
24+
{
25+
private $messageDecoder;
26+
private $connection;
27+
private $shouldStop;
28+
29+
public function __construct(DecoderInterface $messageDecoder, Connection $connection)
30+
{
31+
$this->messageDecoder = $messageDecoder;
32+
$this->connection = $connection;
33+
}
34+
35+
/**
36+
* {@inheritdoc}
37+
*/
38+
public function receive(callable $handler): void
39+
{
40+
while (!$this->shouldStop) {
41+
$message = $this->connection->get();
42+
if (null === $message) {
43+
$handler(null);
44+
45+
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
46+
if (function_exists('pcntl_signal_dispatch')) {
47+
pcntl_signal_dispatch();
48+
}
49+
50+
continue;
51+
}
52+
53+
try {
54+
$handler($this->messageDecoder->decode(array(
55+
'body' => $message->getBody(),
56+
'headers' => $message->getHeaders(),
57+
)));
58+
59+
$this->connection->ack($message);
60+
} catch (RejectMessageExceptionInterface $e) {
61+
$this->connection->reject($message);
62+
63+
throw $e;
64+
} catch (\Throwable $e) {
65+
$this->connection->nack($message, AMQP_REQUEUE);
66+
67+
throw $e;
68+
} finally {
69+
if (function_exists('pcntl_signal_dispatch')) {
70+
pcntl_signal_dispatch();
71+
}
72+
}
73+
}
74+
}
75+
76+
public function stop(): void
77+
{
78+
$this->shouldStop = true;
79+
}
80+
}

Adapter/AmqpExt/AmqpSender.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Adapter\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Transport\SenderInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
16+
17+
/**
18+
* Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
19+
*
20+
* @author Samuel Roze <[email protected]>
21+
*/
22+
class AmqpSender implements SenderInterface
23+
{
24+
private $messageEncoder;
25+
private $connection;
26+
27+
public function __construct(EncoderInterface $messageEncoder, Connection $connection)
28+
{
29+
$this->messageEncoder = $messageEncoder;
30+
$this->connection = $connection;
31+
}
32+
33+
/**
34+
* {@inheritdoc}
35+
*/
36+
public function send($message)
37+
{
38+
$encodedMessage = $this->messageEncoder->encode($message);
39+
40+
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
41+
}
42+
}

0 commit comments

Comments
 (0)