Skip to content

Commit da17b7f

Browse files
committed
- properly encoded delayed option arguments (avoid AMQPProtocolChannelException: PRECONDITION_FAILED - invalid arg 'x-message-ttl' with delay more than 2147483 seconds or about 24 days)
1 parent 6f887a5 commit da17b7f

File tree

2 files changed

+116
-115
lines changed

2 files changed

+116
-115
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"license": "GPL-2.0",
66
"require": {
77
"php": ">=5.4.0",
8-
"videlalvaro/php-amqplib": "2.4.*"
8+
"php-amqplib/php-amqplib": "2.6.*"
99
},
1010
"require-dev": {
1111
"laravel/framework": ">=4.0.0",

src/Queue/AMQPQueue.php

Lines changed: 115 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
use Forumhouse\LaravelAmqp\Exception\AMQPException;
66
use Forumhouse\LaravelAmqp\Jobs\AMQPJob;
77
use Forumhouse\LaravelAmqp\Utility\ArrayUtil;
8+
use Illuminate\Contracts\Queue\Queue as QueueContract;
89
use Illuminate\Queue\Queue;
910
use PhpAmqpLib\Channel\AMQPChannel;
1011
use PhpAmqpLib\Connection\AMQPConnection;
1112
use PhpAmqpLib\Message\AMQPMessage;
12-
use Illuminate\Contracts\Queue\Queue as QueueContract;
13+
use PhpAmqpLib\Wire\AMQPTable;
1314

1415
/**
1516
* Class representing AMQP Queue
@@ -95,6 +96,32 @@ public function __construct(
9596
}
9697
}
9798

99+
/**
100+
* @param string $exchangeName The name of the exchange. For example, 'logs'
101+
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
102+
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
103+
* (from third parameter onwards). Must be an assoc array. Default flags can be omitted
104+
*
105+
* @see \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
106+
* @return void
107+
*/
108+
protected function declareExchange($exchangeName, $exchangeType, array $exchangeFlags = [])
109+
{
110+
$flags = array_replace([
111+
'exchange' => $exchangeName,
112+
'type' => $exchangeType,
113+
'passive' => false,
114+
'durable' => false,
115+
'auto_delete' => true,
116+
'internal' => false,
117+
'nowait' => false,
118+
'arguments' => null,
119+
'ticket' => null,
120+
], $exchangeFlags);
121+
122+
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
123+
}
124+
98125
/**
99126
* Push a new job onto the queue.
100127
*
@@ -114,6 +141,77 @@ public function push($job, $data = '', $queue = null)
114141
return true;
115142
}
116143

144+
/**
145+
* Helper to return a default queue name in case passed param is empty
146+
*
147+
* @param string|null $name Queue name. If null, default will be used
148+
*
149+
* @throws AMQPException
150+
* @return string Queue name to be used in AMQP calls
151+
*/
152+
protected function getQueueName($name)
153+
{
154+
$name = $name ?: $this->defaultQueueName;
155+
if ($name === null) {
156+
throw new AMQPException('Default nor specific queue names given');
157+
}
158+
return $name;
159+
}
160+
161+
/**
162+
* Declares a queue to the AMQP library
163+
*
164+
* @param string $name The name of the queue to declare
165+
*
166+
* @return void
167+
*/
168+
public function declareQueue($name)
169+
{
170+
$queue = $this->getQueueName($name);
171+
$flags = array_replace_recursive([
172+
'queue' => $queue,
173+
'passive' => false,
174+
'durable' => false,
175+
'exclusive' => false,
176+
'auto_delete' => true,
177+
'nowait' => false,
178+
'arguments' => null,
179+
'ticket' => null,
180+
], $this->getQueueFlags($name));
181+
182+
call_user_func_array([$this->channel, 'queue_declare'], $flags);
183+
}
184+
185+
/**
186+
* @param string $queueName
187+
* @param null|string $deferredQueueName
188+
* @param null|int $deferredQueueDelay
189+
*
190+
* @return array
191+
*/
192+
protected function getQueueFlags($queueName, $deferredQueueName = null, $deferredQueueDelay = null)
193+
{
194+
$args = func_get_args();
195+
$result = ArrayUtil::arrayMapRecursive(function ($value) use ($args) {
196+
return is_callable($value) ? call_user_func_array($value, $args) : $value;
197+
}, $this->queueFlags);
198+
199+
$result = ArrayUtil::removeNullsRecursive($result);
200+
201+
return $result;
202+
}
203+
204+
/**
205+
* Get routing key from config or use default one (queue name)
206+
*
207+
* @param $queue string
208+
* @return string Routing key name
209+
*/
210+
protected function getRoutingKey($queue)
211+
{
212+
return empty($this->queueFlags['routing_key']) ? $queue : $this->queueFlags['routing_key'];
213+
}
214+
117215
/**
118216
* Push a raw payload onto the queue.
119217
*
@@ -157,76 +255,6 @@ public function later($delay, $job, $data = '', $queue = null)
157255
return true;
158256
}
159257

160-
/**
161-
* Pop the next job off of the queue.
162-
*
163-
* @param string|null $queue Queue name if different from the default one
164-
*
165-
* @return \Illuminate\Queue\Jobs\Job|null Job instance or null if no unhandled jobs available
166-
*/
167-
public function pop($queue = null)
168-
{
169-
$queue = $this->getQueueName($queue);
170-
$this->declareQueue($queue);
171-
$envelope = $this->channel->basic_get($queue);
172-
173-
if ($envelope instanceof AMQPMessage) {
174-
return new AMQPJob($this->container, $queue, $this->channel, $envelope);
175-
}
176-
177-
return null;
178-
}
179-
180-
/**
181-
* @param string $exchangeName The name of the exchange. For example, 'logs'
182-
* @param string $exchangeType The type of the exchange. See EXCHANGE_TYPE_* constants for details
183-
* @param array $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
184-
* (from third parameter onwards). Must be an assoc array. Default flags can be omitted
185-
*
186-
* @see \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
187-
* @return void
188-
*/
189-
protected function declareExchange($exchangeName, $exchangeType, array $exchangeFlags = [])
190-
{
191-
$flags = array_replace([
192-
'exchange' => $exchangeName,
193-
'type' => $exchangeType,
194-
'passive' => false,
195-
'durable' => false,
196-
'auto_delete' => true,
197-
'internal' => false,
198-
'nowait' => false,
199-
'arguments' => null,
200-
'ticket' => null,
201-
], $exchangeFlags);
202-
203-
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
204-
}
205-
206-
/**
207-
* Declares a queue to the AMQP library
208-
*
209-
* @param string $name The name of the queue to declare
210-
*
211-
* @return void
212-
*/
213-
public function declareQueue($name)
214-
{
215-
$queue = $this->getQueueName($name);
216-
$flags = array_replace_recursive([
217-
'queue' => $queue,
218-
'passive' => false,
219-
'durable' => false,
220-
'exclusive' => false,
221-
'auto_delete' => true,
222-
'nowait' => false,
223-
'arguments' => null,
224-
'ticket' => null,
225-
], $this->getQueueFlags($name));
226-
227-
call_user_func_array([$this->channel, 'queue_declare'], $flags);
228-
}
229-
230258
/**
231259
* Declares delayed queue to the AMQP library
232260
*
@@ -252,61 +280,34 @@ public function declareDelayedQueue($destinationQueueName, $delay)
252280
], $this->getQueueFlags($destinationQueueName, $deferredQueueName, $delay), [
253281
'queue' => $deferredQueueName,
254282
'durable' => true,
255-
'arguments' => [
256-
'x-dead-letter-exchange' => ['S', ''],
257-
'x-dead-letter-routing-key' => ['S', $destinationQueueName],
258-
'x-message-ttl' => ['I', $delay * 1000],
259-
],
283+
'arguments' => new AMQPTable([
284+
'x-dead-letter-exchange' => '',
285+
'x-dead-letter-routing-key' => $destinationQueueName,
286+
'x-message-ttl' => $delay * 1000,
287+
]),
260288
]);
261289

262290
call_user_func_array([$this->channel, 'queue_declare'], $flags);
263291
return $deferredQueueName;
264292
}
265293

266294
/**
267-
* Helper to return a default queue name in case passed param is empty
268-
*
269-
* @param string|null $name Queue name. If null, default will be used
270-
*
271-
* @throws AMQPException
272-
* @return string Queue name to be used in AMQP calls
273-
*/
274-
protected function getQueueName($name)
275-
{
276-
$name = $name ?: $this->defaultQueueName;
277-
if ($name === null) {
278-
throw new AMQPException('Default nor specific queue names given');
279-
}
280-
return $name;
281-
}
282-
283-
/**
284-
* Get routing key from config or use default one (queue name)
295+
* Pop the next job off of the queue.
285296
*
286-
* @param $queue string
287-
* @return string Routing key name
288-
*/
289-
protected function getRoutingKey($queue)
290-
{
291-
return empty($this->queueFlags['routing_key']) ? $queue : $this->queueFlags['routing_key'];
292-
}
293-
294-
/**
295-
* @param string $queueName
296-
* @param null|string $deferredQueueName
297-
* @param null|int $deferredQueueDelay
297+
* @param string|null $queue Queue name if different from the default one
298298
*
299-
* @return array
299+
* @return \Illuminate\Queue\Jobs\Job|null Job instance or null if no unhandled jobs available
300300
*/
301-
protected function getQueueFlags($queueName, $deferredQueueName = null, $deferredQueueDelay = null)
301+
public function pop($queue = null)
302302
{
303-
$args = func_get_args();
304-
$result = ArrayUtil::arrayMapRecursive(function ($value) use($args) {
305-
return is_callable($value) ? call_user_func_array($value, $args) : $value;
306-
}, $this->queueFlags);
303+
$queue = $this->getQueueName($queue);
304+
$this->declareQueue($queue);
305+
$envelope = $this->channel->basic_get($queue);
307306

308-
$result = ArrayUtil::removeNullsRecursive($result);
307+
if ($envelope instanceof AMQPMessage) {
308+
return new AMQPJob($this->container, $queue, $this->channel, $envelope);
309+
}
309310

310-
return $result;
311+
return null;
311312
}
312313
}

0 commit comments

Comments
 (0)