Skip to content

Commit 89d5e7f

Browse files
committed
retry-after option related to #21
increment required php version to real value
1 parent 63b959b commit 89d5e7f

File tree

5 files changed

+70
-3
lines changed

5 files changed

+70
-3
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ In your ```config/queue.php``` file you have to provide the following:
4141
'exchange_name' => null,
4242
'exchange_type' => null,
4343
'exchange_flags' => null,
44+
'keepalive' > false,
45+
'heartbeat' => 0,
46+
'retry_after' => 0,
4447
),
4548
),
4649
```

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"minimum-stability": "stable",
55
"license": "GPL-2.0",
66
"require": {
7-
"php": ">=5.4.0",
7+
"php": ">=7",
88
"php-amqplib/php-amqplib": "2.6.*"
99
},
1010
"require-dev": {

src/Connectors/AmqpConnector.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public function connect(array $config)
5050
return new AMQPQueue(
5151
$connection, $config['queue'], $config['queue_flags'], isset($config['declare_queues']) ? $config['declare_queues'] : true,
5252
$config['message_properties'], $config['channel_id'],
53-
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags']
53+
$config['exchange_name'], $config['exchange_type'], $config['exchange_flags'], (isset($config['retry_after']) ? $config['retry_after'] : 0)
5454
);
5555
}
5656
}

src/Jobs/AMQPJob.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public function release($delay = 0)
7575
$job = $body['job'];
7676
$data = $body['data'];
7777

78+
//if retry_after option is set use it on failure instead of traditional delay
79+
if(isset($body['data']['retryAfter']) && $body['data']['retryAfter'] > 0)
80+
$delay = $body['data']['retryAfter'];
81+
7882
/** @var QueueContract $queue */
7983
$queue = $this->container['queue']->connection();
8084
if ($delay > 0) {
@@ -127,4 +131,5 @@ public function getJobId()
127131
{
128132
return $this->amqpMessage->get('message_id');
129133
}
134+
130135
}

src/Queue/AMQPQueue.php

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Forumhouse\LaravelAmqp\Jobs\AMQPJob;
77
use Forumhouse\LaravelAmqp\Utility\ArrayUtil;
88
use Illuminate\Contracts\Queue\Queue as QueueContract;
9+
use Illuminate\Queue\InvalidPayloadException;
910
use Illuminate\Queue\Queue;
1011
use PhpAmqpLib\Channel\AMQPChannel;
1112
use PhpAmqpLib\Connection\AMQPConnection;
@@ -68,6 +69,11 @@ class AMQPQueue extends Queue implements QueueContract
6869
*/
6970
private $declareQueues;
7071

72+
/**
73+
* @var int
74+
*/
75+
private $retryAfter;
76+
7177
/**
7278
* @param AMQPStreamConnection $connection
7379
* @param string $defaultQueueName Default queue name
@@ -82,6 +88,7 @@ class AMQPQueue extends Queue implements QueueContract
8288
* @param string $exchangeName Exchange name
8389
* @param mixed $exchangeType Exchange type
8490
* @param mixed $exchangeFlags Exchange flags
91+
* @param mixed $retryAfter Optional timeout for failed jobs
8592
*/
8693
public function __construct(
8794
AMQPStreamConnection $connection,
@@ -92,7 +99,8 @@ public function __construct(
9299
$defaultChannelId = null,
93100
$exchangeName = '',
94101
$exchangeType = null,
95-
$exchangeFlags = []
102+
$exchangeFlags = [],
103+
$retryAfter = 0
96104
) {
97105
$this->connection = $connection;
98106
$this->defaultQueueName = $defaultQueueName ?: 'default';
@@ -102,6 +110,7 @@ public function __construct(
102110
$this->defaultChannelId = $defaultChannelId;
103111
$this->exchangeName = $exchangeName;
104112
$this->channel = $connection->channel($this->defaultChannelId);
113+
$this->retryAfter = $retryAfter;
105114

106115
if ($exchangeName !== null) {
107116
$this->declareExchange($exchangeName, $exchangeType, $exchangeFlags);
@@ -134,6 +143,13 @@ protected function declareExchange($exchangeName, $exchangeType, array $exchange
134143
call_user_func_array([$this->channel, 'exchange_declare'], $flags);
135144
}
136145

146+
/**
147+
* @return array
148+
*/
149+
public function getCustomMessageOptions(){
150+
return ['retryAfter' => $this->retryAfter];
151+
}
152+
137153
/**
138154
* Push a new job onto the queue.
139155
*
@@ -410,4 +426,47 @@ private function prepareQueue($queue)
410426

411427
return $queue;
412428
}
429+
430+
/**
431+
* Create a payload string from the given job and data.
432+
*
433+
* @param string $job
434+
* @param mixed $data
435+
* @return string
436+
*
437+
* @throws \Illuminate\Queue\InvalidPayloadException
438+
*/
439+
protected function createPayload($job, $data = '')
440+
{
441+
$data = is_array($data) ? array_merge($data, $this->getCustomMessageOptions()) : $this->getCustomMessageOptions();
442+
$payload = json_encode($this->createPayloadArray($job, $data));
443+
if (JSON_ERROR_NONE !== json_last_error()) {
444+
throw new InvalidPayloadException(
445+
'Unable to JSON encode payload. Error code: '.json_last_error()
446+
);
447+
}
448+
449+
return $payload;
450+
}
451+
452+
/**
453+
* Create a payload for an object-based queue handler.
454+
*
455+
* @param mixed $job
456+
* @return array
457+
*/
458+
protected function createObjectPayload($job)
459+
{
460+
return [
461+
'displayName' => $this->getDisplayName($job),
462+
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
463+
'maxTries' => $job->tries ?? null,
464+
'timeout' => $job->timeout ?? null,
465+
'timeoutAt' => $this->getJobExpiration($job),
466+
'data' => array_merge([
467+
'commandName' => get_class($job),
468+
'command' => serialize(clone $job),
469+
],$this->getCustomMessageOptions()),
470+
];
471+
}
413472
}

0 commit comments

Comments
 (0)