Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/Queues/DTO/BackoffStrategyDTO.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

namespace Qless\Queues\DTO;

class BackoffStrategyDTO
{
/**
* @var int
*/
private $initialDelay;

/**
* @var int
*/
private $factor;

public function __construct(int $initialDelay, int $factor)
{
$this->initialDelay = $initialDelay;
$this->factor = $factor;
}

/**
* @return int
*/
public function getFactor(): int
{
return $this->factor;
}

/**
* @return int
*/
public function getInitialDelay(): int
{
return $this->initialDelay;
}

public function toArray(): array
{
return [
'factor' => $this->getFactor(),
'initial_delay' => $this->getInitialDelay(),
];
}
}
8 changes: 6 additions & 2 deletions src/Queues/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Qless\Exceptions\UnknownPropertyException;
use Qless\Jobs\BaseJob;
use Qless\Jobs\JobData;
use Qless\Queues\DTO\BackoffStrategyDTO;
use Qless\Support\PropertyAccessor;
use Ramsey\Uuid\Uuid;

Expand Down Expand Up @@ -85,7 +86,8 @@ public function put(
?int $retries = null,
?int $priority = null,
?array $tags = null,
?array $depends = null
?array $depends = null,
?BackoffStrategyDTO $backoffStrategyDTO = null
): string {
try {
$jid = $jid ?: str_replace('-', '', Uuid::uuid4()->toString());
Expand Down Expand Up @@ -120,7 +122,9 @@ public function put(
'retries',
is_null($retries) ? 5 : $retries,
'depends',
json_encode($depends ?: [], JSON_UNESCAPED_SLASHES)
json_encode($depends ?: [], JSON_UNESCAPED_SLASHES),
'backoff',
json_encode($backoffStrategyDTO ? $backoffStrategyDTO->toArray() : [], JSON_UNESCAPED_SLASHES)
);

$this->getEventsManager()->fire(new QueueEvent\AfterEnqueue($this, $jid, $data->toArray(), $className));
Expand Down
65 changes: 50 additions & 15 deletions src/qless-core/qless.lua
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ Qless.config.defaults = {
['histogram-history'] = 7,
['jobs-history-count'] = 50000,
['jobs-history'] = 604800,
['jobs-failed-history'] = 604800
['jobs-failed-history'] = 604800,
-- retries logic
['backoff-initial-delay'] = 0, -- Default delay in seconds. 0 means disabled.
['backoff-factor'] = 3 -- Exponential factor.
}

Qless.config.get = function(key, default)
Expand Down Expand Up @@ -1584,19 +1587,25 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)
redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed' , -1)
end

redis.call('hmset', QlessJob.ns .. jid,
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'priority' , priority,
'tags' , cjson.encode(tags),
'state' , ((delay > 0) and 'scheduled') or 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", now))
local job_fields = {
'jid' , jid,
'klass' , klass,
'data' , raw_data,
'priority' , priority,
'tags' , cjson.encode(tags),
'state' , ((delay > 0) and 'scheduled') or 'waiting',
'worker' , '',
'expires' , 0,
'queue' , self.name,
'retries' , retries,
'remaining', retries,
'time' , string.format("%.20f", now)
}
if options['backoff'] then
table.insert(job_fields, 'backoff')
table.insert(job_fields, options['backoff'])
end
redis.call('hmset', QlessJob.ns .. jid, unpack(job_fields))

for i, j in ipairs(depends) do
local state = redis.call('hget', QlessJob.ns .. j, 'state')
Expand Down Expand Up @@ -1954,7 +1963,33 @@ function QlessQueue:invalidate_locks(now, count)
redis.call('zadd', 'ql:failed-jobs-list', now, jid)
clearOldFailedJobs(now)
else
table.insert(jids, jid)
local backoff_json = redis.call('hget', QlessJob.ns .. jid, 'backoff')
local backoff_config = {}
if backoff_json then
backoff_config = cjson.decode(backoff_json)
end

local initial_delay = tonumber(backoff_config['initial_delay'])
local backoff_factor = tonumber(backoff_config['factor'])
if initial_delay == nil then
initial_delay = tonumber(Qless.config.get('backoff-initial-delay', 0))
end
if backoff_factor == nil then
backoff_factor = tonumber(Qless.config.get('backoff-factor', 3))
end
if initial_delay == 0 then
table.insert(jids, jid)
else
local total_retries = tonumber(redis.call('hget', QlessJob.ns .. jid, 'retries') or 5)
local retries_left = tonumber(redis.call('hget', QlessJob.ns .. jid, 'remaining') or total_retries)

local attempt_index = total_retries - retries_left
local delay = initial_delay * (backoff_factor ^ attempt_index)

self.locks.remove(jid)
self.scheduled.add(now + delay, jid)
redis.call('hset', QlessJob.ns .. jid, 'state', 'scheduled')
end
end
end
end
Expand Down