Skip to content

Commit 884cdce

Browse files
author
Valentin
committed
[Amqp] Add amqps support
1 parent dc5e275 commit 884cdce

File tree

4 files changed

+93
-3
lines changed

4 files changed

+93
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
-----
66

77
* Add option to confirm message delivery
8+
* DSN now support AMQPS out-of-the-box.
89

910
5.1.0
1011
-----

Tests/Transport/ConnectionTest.php

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,24 @@ public function testItCanBeConstructedWithDefaults()
5353
);
5454
}
5555

56+
public function testItCanBeConstructedWithAnAmqpsDsn()
57+
{
58+
$this->assertEquals(
59+
new Connection([
60+
'host' => 'localhost',
61+
'port' => 5671,
62+
'vhost' => '/',
63+
'cacert' => '/etc/ssl/certs',
64+
], [
65+
'name' => self::DEFAULT_EXCHANGE_NAME,
66+
], [
67+
self::DEFAULT_EXCHANGE_NAME => [],
68+
]),
69+
Connection::fromDsn('amqps://localhost?'.
70+
'cacert=/etc/ssl/certs')
71+
);
72+
}
73+
5674
public function testItGetsParametersFromTheDsn()
5775
{
5876
$this->assertEquals(
@@ -314,6 +332,45 @@ public function testItSetupsTheConnection()
314332
$connection->publish('body');
315333
}
316334

335+
public function testItSetupsTheTTLConnection()
336+
{
337+
$amqpConnection = $this->createMock(\AMQPConnection::class);
338+
$amqpChannel = $this->createMock(\AMQPChannel::class);
339+
$amqpExchange = $this->createMock(\AMQPExchange::class);
340+
$amqpQueue0 = $this->createMock(\AMQPQueue::class);
341+
$amqpQueue1 = $this->createMock(\AMQPQueue::class);
342+
343+
$factory = $this->createMock(AmqpFactory::class);
344+
$factory->method('createConnection')->willReturn($amqpConnection);
345+
$factory->method('createChannel')->willReturn($amqpChannel);
346+
$factory->method('createExchange')->willReturn($amqpExchange);
347+
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
348+
349+
$amqpExchange->expects($this->once())->method('declareExchange');
350+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
351+
$amqpQueue0->expects($this->once())->method('declareQueue');
352+
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
353+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
354+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
355+
);
356+
$amqpQueue1->expects($this->once())->method('declareQueue');
357+
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
358+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
359+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
360+
);
361+
362+
$dsn = 'amqps://localhost?'.
363+
'cacert=/etc/ssl/certs&'.
364+
'exchange[default_publish_routing_key]=routing_key&'.
365+
'queues[queue0][binding_keys][0]=binding_key0&'.
366+
'queues[queue0][binding_keys][1]=binding_key1&'.
367+
'queues[queue1][binding_keys][0]=binding_key2&'.
368+
'queues[queue1][binding_keys][1]=binding_key3';
369+
370+
$connection = Connection::fromDsn($dsn, [], $factory);
371+
$connection->publish('body');
372+
}
373+
317374
public function testBindingArguments()
318375
{
319376
$amqpConnection = $this->createMock(\AMQPConnection::class);
@@ -506,6 +563,27 @@ public function testObfuscatePasswordInDsn()
506563
$connection->channel();
507564
}
508565

566+
public function testNoCaCertOnSslConnectionFromDsn()
567+
{
568+
$this->expectException(InvalidArgumentException::class);
569+
$this->expectExceptionMessage('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.');
570+
571+
$factory = new TestAmqpFactory(
572+
$amqpConnection = $this->createMock(\AMQPConnection::class),
573+
$amqpChannel = $this->createMock(\AMQPChannel::class),
574+
$amqpQueue = $this->createMock(\AMQPQueue::class),
575+
$amqpExchange = $this->createMock(\AMQPExchange::class)
576+
);
577+
578+
$oldCaCertValue = ini_set('amqp.cacert', '');
579+
580+
try {
581+
Connection::fromDsn('amqps://', [], $factory);
582+
} finally {
583+
ini_set('amqp.cacert', $oldCaCertValue);
584+
}
585+
}
586+
509587
public function testAmqpStampHeadersAreUsed()
510588
{
511589
$factory = new TestAmqpFactory(

Transport/AmqpTransportFactory.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public function createTransport(string $dsn, array $options, SerializerInterface
2929

3030
public function supports(string $dsn, array $options): bool
3131
{
32-
return 0 === strpos($dsn, 'amqp://');
32+
return 0 === strpos($dsn, 'amqp://') || 0 === strpos($dsn, 'amqps://');
3333
}
3434
}
3535
class_alias(AmqpTransportFactory::class, \Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory::class);

Transport/Connection.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,20 +167,22 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
167167
{
168168
if (false === $parsedUrl = parse_url($dsn)) {
169169
// this is a valid URI that parse_url cannot handle when you want to pass all parameters as options
170-
if ('amqp://' !== $dsn) {
170+
if (!\in_array($dsn, ['amqp://', 'amqps://'])) {
171171
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
172172
}
173173

174174
$parsedUrl = [];
175175
}
176176

177+
$useAmqps = 0 === strpos($dsn, 'amqps://');
177178
$pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : [];
178179
$exchangeName = $pathParts[1] ?? 'messages';
179180
parse_str($parsedUrl['query'] ?? '', $parsedQuery);
181+
$port = $useAmqps ? 5671 : 5672;
180182

181183
$amqpOptions = array_replace_recursive([
182184
'host' => $parsedUrl['host'] ?? 'localhost',
183-
'port' => $parsedUrl['port'] ?? 5672,
185+
'port' => $parsedUrl['port'] ?? $port,
184186
'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/',
185187
'exchange' => [
186188
'name' => $exchangeName,
@@ -216,6 +218,10 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
216218
return $queueOptions;
217219
}, $queuesOptions);
218220

221+
if ($useAmqps && !self::hasCaCertConfigured($amqpOptions)) {
222+
throw new InvalidArgumentException('No CA certificate has been provided. Set "amqp.cacert" in your php.ini or pass the "cacert" parameter in the DSN to use SSL. Alternatively, you can use amqp:// to use without SSL.');
223+
}
224+
219225
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
220226
}
221227

@@ -260,6 +266,11 @@ private static function normalizeQueueArguments(array $arguments): array
260266
return $arguments;
261267
}
262268

269+
private static function hasCaCertConfigured(array $amqpOptions): bool
270+
{
271+
return (isset($amqpOptions['cacert']) && '' !== $amqpOptions['cacert']) || '' !== ini_get('amqp.cacert');
272+
}
273+
263274
/**
264275
* @throws \AMQPException
265276
*/

0 commit comments

Comments
 (0)