Skip to content

Commit 0cecf28

Browse files
authored
Merge pull request #26 from fhteam/maksimru-fix/laravel-5.7
Maksimru fix/laravel 5.7
2 parents 0ea949e + ccc620d commit 0cecf28

File tree

7 files changed

+159
-85
lines changed

7 files changed

+159
-85
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,54 @@ In your ```config/queue.php``` file you have to provide the following:
4141
'exchange_name' => null,
4242
'exchange_type' => null,
4343
'exchange_flags' => null,
44+
'keepalive' > false,
45+
'heartbeat' => 0,
46+
'retry_after' => 0,
4447
),
4548
),
4649
```
4750

4851
In your ```config/app.php``` add ```'Forumhouse\LaravelAmqp\ServiceProvider\LaravelAmqpServiceProvider'``` to the list of service
4952
providers registered.
5053

54+
Improved worker stability (PHP 7.1+ is required)
55+
------------
56+
57+
For better stability please add following code in app/Exceptions/Handler.php:
58+
59+
```php
60+
class Handler extends ExceptionHandler
61+
{
62+
```
63+
64+
to
65+
66+
```php
67+
class Handler extends ExceptionHandler
68+
{
69+
use AMQPFailureDetector;
70+
```
71+
72+
And
73+
74+
```php
75+
public function report(Exception $exception)
76+
{
77+
parent::report($exception);
78+
}
79+
```
80+
81+
to
82+
83+
```php
84+
public function report(Exception $exception)
85+
{
86+
$this->catchAMQPConnectionFailure($exception);
87+
parent::report($exception);
88+
}
89+
```
90+
91+
5192
Usage
5293
------------
5394

composer.json

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,22 @@
44
"minimum-stability": "stable",
55
"license": "GPL-2.0",
66
"require": {
7-
"php": ">=5.4.0",
8-
"php-amqplib/php-amqplib": "2.6.*"
7+
"php": ">=7.1",
8+
"php-amqplib/php-amqplib": "2.6.*",
9+
"ext-json": "*"
910
},
1011
"require-dev": {
11-
"laravel/framework": ">=5.3.0",
12+
"laravel/framework": ">=5.7.0",
1213
"squizlabs/php_codesniffer": "1.*",
13-
"orchestra/testbench": "~3.4",
14-
"phpunit/phpunit": "~6.1"
14+
"orchestra/testbench": "3.8.*"
1515
},
1616
"autoload": {
1717
"psr-4": {
1818
"Forumhouse\\LaravelAmqp\\": "src/",
1919
"Forumhouse\\LaravelAmqp\\Tests\\": "tests/"
2020
}
21+
},
22+
"suggest": {
23+
"ext-posix": "Restart workers if connection is lost to avoid unlimited loop"
2124
}
2225
}

src/Connectors/AmqpConnector.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ public function connect(array $config)
4848
}
4949

5050
return new AMQPQueue(
51-
$connection, $config['queue'], $config['queue_flags'], $config['declare_queues'],
51+
$connection, $config['queue'], $config['queue_flags'], isset($config['declare_queues']) ? $config['declare_queues'] : true,
5252
$config['message_properties'], $config['channel_id'],
53-
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags']
53+
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags'], (isset($config['retry_after']) ? $config['retry_after'] : 0)
5454
);
5555
}
5656
}

src/Jobs/AMQPJob.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ public function release($delay = 0)
7070
$body['attempts'] = $this->attempts() + 1;
7171
$job = $body['job'];
7272

73+
//if retry_after option is set use it on failure instead of traditional delay
74+
if(isset($body['data']['retryAfter']) && $body['data']['retryAfter'] > 0)
75+
$delay = $body['data']['retryAfter'];
76+
7377
/** @var QueueContract $queue */
7478
$queue = $this->container['queue']->connection();
7579
if ($delay > 0) {
@@ -119,10 +123,14 @@ public function getQueue()
119123
* Get the job identifier.
120124
*
121125
* @return string
122-
* @throws \OutOfBoundsException
123126
*/
124127
public function getJobId()
125128
{
126-
return $this->amqpMessage->get('message_id');
129+
try {
130+
return $this->amqpMessage->get('message_id');
131+
} catch (\OutOfBoundsException $exception){
132+
return null;
133+
}
127134
}
135+
128136
}

src/Queue/AMQPQueue.php

Lines changed: 60 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use Illuminate\Queue\InvalidPayloadException;
1010
use Illuminate\Queue\Queue;
1111
use PhpAmqpLib\Channel\AMQPChannel;
12-
use PhpAmqpLib\Connection\AMQPConnection;
1312
use PhpAmqpLib\Connection\AMQPStreamConnection;
1413
use PhpAmqpLib\Message\AMQPMessage;
1514
use PhpAmqpLib\Wire\AMQPTable;
@@ -30,7 +29,7 @@ class AMQPQueue extends Queue implements QueueContract
3029
const EXCHANGE_TYPE_FANOUT = 'fanout';
3130

3231
/**
33-
* @var AMQPConnection Connection to amqp compatible server
32+
* @var AMQPStreamConnection Connection to amqp compatible server
3433
*/
3534
protected $connection;
3635

@@ -69,6 +68,11 @@ class AMQPQueue extends Queue implements QueueContract
6968
*/
7069
private $declareQueues;
7170

71+
/**
72+
* @var int
73+
*/
74+
private $retryAfter;
75+
7276
/**
7377
* @param AMQPStreamConnection $connection
7478
* @param string $defaultQueueName Default queue name
@@ -83,6 +87,7 @@ class AMQPQueue extends Queue implements QueueContract
8387
* @param string $exchangeName Exchange name
8488
* @param mixed $exchangeType Exchange type
8589
* @param mixed $exchangeFlags Exchange flags
90+
* @param mixed $retryAfter Optional timeout for failed jobs
8691
*/
8792
public function __construct(
8893
AMQPStreamConnection $connection,
@@ -93,7 +98,8 @@ public function __construct(
9398
$defaultChannelId = null,
9499
$exchangeName = '',
95100
$exchangeType = null,
96-
$exchangeFlags = []
101+
$exchangeFlags = [],
102+
$retryAfter = 0
97103
) {
98104
$this->connection = $connection;
99105
$this->defaultQueueName = $defaultQueueName ?: 'default';
@@ -103,6 +109,7 @@ public function __construct(
103109
$this->defaultChannelId = $defaultChannelId;
104110
$this->exchangeName = $exchangeName;
105111
$this->channel = $connection->channel($this->defaultChannelId);
112+
$this->retryAfter = $retryAfter;
106113

107114
if ($exchangeName !== null) {
108115
$this->declareExchange($exchangeName, $exchangeType, $exchangeFlags);
@@ -135,22 +142,29 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
135142
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
136143
}
137144

145+
/**
146+
* @return array
147+
*/
148+
public function getCustomMessageOptions(){
149+
return ['retryAfter' => $this->retryAfter];
150+
}
151+
138152
/**
139153
* Push a new job onto the queue.
140154
*
141155
* @param string $job Job implementation class name
142156
* @param mixed $data Job custom data. Usually array
143157
* @param string $queue Queue name, if different from the default one
144158
*
145-
* @throws InvalidPayloadException
159+
* @throws \Illuminate\Queue\InvalidPayloadException
146160
* @throws AMQPException
147161
* @return bool Always true
148162
*/
149163
public function push($job, $data = '', $queue = null)
150164
{
151165
$queue = $this->prepareQueue($queue);
152-
$amqpMessage = $this->prepareMessage($job, $data);
153-
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
166+
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
167+
$this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
154168

155169
return true;
156170
}
@@ -163,14 +177,14 @@ public function push($job, $data = '', $queue = null)
163177
* @param string $queue Queue name, if different from the default one
164178
*
165179
* @return bool
166-
* @throws InvalidPayloadException
180+
* @throws \Illuminate\Queue\InvalidPayloadException
167181
* @throws AMQPException
168182
*/
169183
public function addMessageToBatch($job, $data = '', $queue = null)
170184
{
171185
$queue = $this->prepareQueue($queue);
172-
$amqpMessage = $this->prepareMessage($job, $data);
173-
$this->channel->batch_basic_publish($amqpMessage, $this->exchangeName, $this->getRoutingKey($queue));
186+
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
187+
$this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));
174188

175189
return true;
176190
}
@@ -270,9 +284,6 @@ protected function getRoutingKey($queue)
270284
*/
271285
public function pushRaw($payload, $queue = null, array $options = [])
272286
{
273-
// NB: DYNAMIC PRIORITY IS NOT IMPLEMENTED FOR RAW MESSAGES,
274-
// NB: NEED TO SET THE $this->messageProperties['priority'] FIELD
275-
276287
$queue = $this->prepareQueue($queue);
277288
$amqpPayload = new AMQPMessage($payload, $this->messageProperties);
278289
$this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
@@ -288,7 +299,7 @@ public function pushRaw($payload, $queue = null, array $options = [])
288299
* @param string $queue Queue name, if different from the default one
289300
*
290301
* @return bool Always true
291-
* @throws InvalidPayloadException
302+
* @throws \Illuminate\Queue\InvalidPayloadException
292303
* @throws AMQPException
293304
*/
294305
public function later($delay, $job, $data = '', $queue = null)
@@ -300,8 +311,8 @@ public function later($delay, $job, $data = '', $queue = null)
300311
$queue = $this->prepareQueue($queue);
301312
$delayedQueueName = $this->declareDelayedQueue($queue, $delay);
302313

303-
$amqpMessage = $this->prepareMessage($job, $data);
304-
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $delayedQueueName);
314+
$payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
315+
$this->channel->basic_publish($payload, $this->exchangeName, $delayedQueueName);
305316
return true;
306317
}
307318

@@ -334,7 +345,7 @@ public function declareDelayedQueue($destinationQueueName, $delay)
334345
'arguments' => new AMQPTable([
335346
'x-dead-letter-exchange' => '',
336347
'x-dead-letter-routing-key' => $destinationQueueName,
337-
'x-message-ttl' => $delay * 1000,
348+
'x-message-ttl' => intval($delay * 1000),
338349
]),
339350
]);
340351

@@ -414,73 +425,55 @@ private function prepareQueue($queue)
414425
return $queue;
415426
}
416427

417-
418-
419-
// FNX - ADD DYNAMIC PRIORITY TO MESSAGE
420-
421428
/**
422-
* @param string $job
423-
* @param mixed $data
429+
* Create a payload string from the given job and data.
430+
*
431+
* @param string $job
432+
* @param string $queue
433+
* @param mixed $data
434+
* @return string
424435
*
425-
* @return AMQPMessage
426-
* @throws InvalidPayloadException
436+
* @throws \Illuminate\Queue\InvalidPayloadException
427437
*/
428-
protected function prepareMessage($job, $data)
438+
protected function createPayload($job, $queue, $data = '')
429439
{
430-
$payloadJson = $this->createPayload($job, $data);
431-
$arrPayload = json_decode($payloadJson, true);
432-
433-
// OVERRIDE PRIORITY
434-
// NB: IMPLEMENT QUEUE DEFAULT PRIORITY USING THE FIELD `message_properties['priority']` IN THE CONFIG
435-
$props = $this->messageProperties;
436-
if (!empty($arrPayload['priority']) && $arrPayload['priority'] > 0)
437-
$props['priority'] = $arrPayload['priority'];
438-
439-
return new AMQPMessage($payloadJson, $props);
440+
$data = is_array($data) ? array_merge($data, $this->getCustomMessageOptions()) : $this->getCustomMessageOptions();
441+
$payload = json_encode($this->createPayloadArray($job, $queue, $data));
442+
if (JSON_ERROR_NONE !== json_last_error()) {
443+
throw new InvalidPayloadException(
444+
'Unable to JSON encode payload. Error code: '.json_last_error()
445+
);
446+
}
440447

448+
return $payload;
441449
}
442450

443-
// FNX - OVERRIDE PAYLOAD GENERATION
444451
/**
445452
* Create a payload for an object-based queue handler.
446453
*
447454
* @param mixed $job
455+
* @param string $queue
448456
* @return array
449457
*/
450-
protected function createObjectPayload($job)
458+
protected function createObjectPayload($job, $queue)
451459
{
452-
return [
453-
'data' => [
454-
'command' => serialize(clone $job),
455-
'commandName' => get_class($job),
456-
],
460+
$payload = $this->withCreatePayloadHooks($queue, [
457461
'displayName' => $this->getDisplayName($job),
458462
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
459-
'maxTries' => empty($job->tries) ? null : $job->tries,
460-
'timeout' => empty($job->timeout) ? null : $job->timeout,
463+
'maxTries' => $job->tries ?? null,
464+
'timeout' => $job->timeout ?? null,
461465
'timeoutAt' => $this->getJobExpiration($job),
462-
'priority' => empty($job->priority) ? null : $job->priority,
463-
];
464-
}
465-
/**
466-
* Create a typical, string based queue payload array.
467-
*
468-
* @param string $job
469-
* @param mixed $payload
470-
*
471-
* @return array
472-
*/
473-
protected function createStringPayload($job, $payload)
474-
{
475-
return [
476-
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
477-
'job' => $job,
478-
'maxTries' => $payload['maxTries'],
479-
'timeout' => $payload['timeout'],
480-
'data' => $payload['data'],
481-
// BUG-FIX FOR ATTEMPTS HERE (OTHERWISE THIS FUNCTION IS THE SAME AS THE BASE IMPLEMENTATION):
482-
'attempts' => $payload['attempts'],
483-
'priority' => empty($payload['priority']) ? null : $payload['priority'],
484-
];
466+
'data' => [
467+
'commandName' => $job,
468+
'command' => $job,
469+
],
470+
]);
471+
472+
return array_merge($payload, [
473+
'data' => array_merge([
474+
'commandName' => get_class($job),
475+
'command' => serialize(clone $job),
476+
],$this->getCustomMessageOptions()),
477+
]);
485478
}
486479
}

0 commit comments

Comments
 (0)