Skip to content

Commit db6a2c2

Browse files
nikophilfabpot
authored andcommitted
Messenger: validate options for AMQP and Redis Connections
1 parent 6a08062 commit db6a2c2

File tree

4 files changed

+125
-2
lines changed

4 files changed

+125
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ CHANGELOG
55
-----
66

77
* Introduced the AMQP bridge.
8+
* Deprecated use of invalid options

Tests/Transport/AmqpTransportFactoryTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public function testItCreatesTheTransport()
3636
$factory = new AmqpTransportFactory();
3737
$serializer = $this->createMock(SerializerInterface::class);
3838

39-
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar']), $serializer);
39+
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['host' => 'localhost']), $serializer);
4040

41-
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['foo' => 'bar'], $serializer));
41+
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['host' => 'localhost'], $serializer));
4242
}
4343
}

Tests/Transport/ConnectionTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,42 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn()
102102
);
103103
}
104104

105+
/**
106+
* @group legacy
107+
* @expectedDeprecation Invalid option(s) "foo" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.
108+
*/
109+
public function testDeprecationIfInvalidOptionIsPassedWithDsn()
110+
{
111+
Connection::fromDsn('amqp://host?foo=bar');
112+
}
113+
114+
/**
115+
* @group legacy
116+
* @expectedDeprecation Invalid option(s) "foo" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.
117+
*/
118+
public function testDeprecationIfInvalidOptionIsPassedAsArgument()
119+
{
120+
Connection::fromDsn('amqp://host', ['foo' => 'bar']);
121+
}
122+
123+
/**
124+
* @group legacy
125+
* @expectedDeprecation Invalid queue option(s) "foo" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated since Symfony 5.1.
126+
*/
127+
public function testDeprecationIfInvalidQueueOptionIsPassed()
128+
{
129+
Connection::fromDsn('amqp://host', ['queues' => ['queueName' => ['foo' => 'bar']]]);
130+
}
131+
132+
/**
133+
* @group legacy
134+
* @expectedDeprecation Invalid exchange option(s) "foo" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated since Symfony 5.1.
135+
*/
136+
public function testDeprecationIfInvalidExchangeOptionIsPassed()
137+
{
138+
Connection::fromDsn('amqp://host', ['exchange' => ['foo' => 'bar']]);
139+
}
140+
105141
public function testSetsParametersOnTheQueueAndExchange()
106142
{
107143
$factory = new TestAmqpFactory(

Transport/Connection.php

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,47 @@ class Connection
3232
'x-message-ttl',
3333
];
3434

35+
private const AVAILABLE_OPTIONS = [
36+
'host',
37+
'port',
38+
'vhost',
39+
'user',
40+
'password',
41+
'queues',
42+
'exchange',
43+
'delay',
44+
'auto_setup',
45+
'prefetch_count',
46+
'retry',
47+
'persistent',
48+
'frame_max',
49+
'channel_max',
50+
'heartbeat',
51+
'read_timeout',
52+
'write_timeout',
53+
'connect_timeout',
54+
'cacert',
55+
'cert',
56+
'key',
57+
'verify',
58+
'sasl_method',
59+
];
60+
61+
private const AVAILABLE_QUEUE_OPTIONS = [
62+
'binding_keys',
63+
'binding_arguments',
64+
'flags',
65+
'arguments',
66+
];
67+
68+
private const AVAILABLE_EXCHANGE_OPTIONS = [
69+
'name',
70+
'type',
71+
'default_publish_routing_key',
72+
'flags',
73+
'arguments',
74+
];
75+
3576
private $connectionOptions;
3677
private $exchangeOptions;
3778
private $queuesOptions;
@@ -84,6 +125,9 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
84125
* * vhost: Virtual Host to use with the AMQP service
85126
* * user: Username to use to connect the the AMQP service
86127
* * password: Password to use the connect to the AMQP service
128+
* * read_timeout: Timeout in for income activity. Note: 0 or greater seconds. May be fractional.
129+
* * write_timeout: Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.
130+
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
87131
* * queues[name]: An array of queues, keyed by the name
88132
* * binding_keys: The binding keys (if any) to bind to this queue
89133
* * binding_arguments: Arguments to be used while binding the queue.
@@ -100,6 +144,22 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
100144
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
101145
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
102146
* * prefetch_count: set channel prefetch count
147+
*
148+
* * Connection tuning options (see http://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.tune for details):
149+
* * channel_max: Specifies highest channel number that the server permits. 0 means standard extension limit
150+
* (see PHP_AMQP_MAX_CHANNELS constant)
151+
* * frame_max: The largest frame size that the server proposes for the connection, including frame header
152+
* and end-byte. 0 means standard extension limit (depends on librabbimq default frame size limit)
153+
* * heartbeat: The delay, in seconds, of the connection heartbeat that the server wants.
154+
* 0 means the server does not want a heartbeat. Note, librabbitmq has limited heartbeat support,
155+
* which means heartbeats checked only during blocking calls.
156+
*
157+
* TLS support (see https://www.rabbitmq.com/ssl.html for details):
158+
* * cacert: Path to the CA cert file in PEM format..
159+
* * cert: Path to the client certificate in PEM foramt.
160+
* * key: Path to the client key in PEM format.
161+
* * verify: Enable or disable peer verification. If peer verification is enabled then the common name in the
162+
* server certificate must match the server name. Peer verification is enabled by default.
103163
*/
104164
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
105165
{
@@ -125,6 +185,8 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
125185
],
126186
], $options, $parsedQuery);
127187

188+
self::validateOptions($amqpOptions);
189+
128190
if (isset($parsedUrl['user'])) {
129191
$amqpOptions['login'] = $parsedUrl['user'];
130192
}
@@ -155,6 +217,30 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
155217
return new self($amqpOptions, $exchangeOptions, $queuesOptions, $amqpFactory);
156218
}
157219

220+
private static function validateOptions(array $options): void
221+
{
222+
if (0 < \count($invalidOptions = array_diff(array_keys($options), self::AVAILABLE_OPTIONS))) {
223+
@trigger_error(sprintf('Invalid option(s) "%s" passed to the AMQP Messenger transport. Passing invalid options is deprecated since Symfony 5.1.', implode('", "', $invalidOptions)), E_USER_DEPRECATED);
224+
}
225+
226+
if (\is_array($options['queues'] ?? false)) {
227+
foreach ($options['queues'] as $queue) {
228+
if (!\is_array($queue)) {
229+
continue;
230+
}
231+
232+
if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
233+
@trigger_error(sprintf('Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated since Symfony 5.1.', implode('", "', $invalidQueueOptions)), E_USER_DEPRECATED);
234+
}
235+
}
236+
}
237+
238+
if (\is_array($options['exchange'] ?? false)
239+
&& 0 < \count($invalidExchangeOptions = array_diff(array_keys($options['exchange']), self::AVAILABLE_EXCHANGE_OPTIONS))) {
240+
@trigger_error(sprintf('Invalid exchange option(s) "%s" passed to the AMQP Messenger transport. Passing invalid exchange options is deprecated since Symfony 5.1.', implode('", "', $invalidExchangeOptions)), E_USER_DEPRECATED);
241+
}
242+
}
243+
158244
private static function normalizeQueueArguments(array $arguments): array
159245
{
160246
foreach (self::ARGUMENTS_AS_INTEGER as $key) {

0 commit comments

Comments
 (0)