diff --git a/src/Queues/DTO/BackoffStrategyDTO.php b/src/Queues/DTO/BackoffStrategyDTO.php new file mode 100644 index 0000000..71eec8a --- /dev/null +++ b/src/Queues/DTO/BackoffStrategyDTO.php @@ -0,0 +1,46 @@ +initialDelay = $initialDelay; + $this->factor = $factor; + } + + /** + * @return int + */ + public function getFactor(): int + { + return $this->factor; + } + + /** + * @return int + */ + public function getInitialDelay(): int + { + return $this->initialDelay; + } + + public function toArray(): array + { + return [ + 'factor' => $this->getFactor(), + 'initial_delay' => $this->getInitialDelay(), + ]; + } +} diff --git a/src/Queues/Queue.php b/src/Queues/Queue.php index e13b55d..b828184 100644 --- a/src/Queues/Queue.php +++ b/src/Queues/Queue.php @@ -12,6 +12,7 @@ use Qless\Exceptions\UnknownPropertyException; use Qless\Jobs\BaseJob; use Qless\Jobs\JobData; +use Qless\Queues\DTO\BackoffStrategyDTO; use Qless\Support\PropertyAccessor; use Ramsey\Uuid\Uuid; @@ -85,7 +86,8 @@ public function put( ?int $retries = null, ?int $priority = null, ?array $tags = null, - ?array $depends = null + ?array $depends = null, + ?BackoffStrategyDTO $backoffStrategyDTO = null ): string { try { $jid = $jid ?: str_replace('-', '', Uuid::uuid4()->toString()); @@ -120,7 +122,9 @@ public function put( 'retries', is_null($retries) ? 5 : $retries, 'depends', - json_encode($depends ?: [], JSON_UNESCAPED_SLASHES) + json_encode($depends ?: [], JSON_UNESCAPED_SLASHES), + 'backoff', + json_encode($backoffStrategyDTO ? $backoffStrategyDTO->toArray() : [], JSON_UNESCAPED_SLASHES) ); $this->getEventsManager()->fire(new QueueEvent\AfterEnqueue($this, $jid, $data->toArray(), $className)); diff --git a/src/qless-core/qless.lua b/src/qless-core/qless.lua index 6feec74..15b026e 100644 --- a/src/qless-core/qless.lua +++ b/src/qless-core/qless.lua @@ -385,7 +385,10 @@ Qless.config.defaults = { ['histogram-history'] = 7, ['jobs-history-count'] = 50000, ['jobs-history'] = 604800, - ['jobs-failed-history'] = 604800 + ['jobs-failed-history'] = 604800, + -- retries logic + ['backoff-initial-delay'] = 0, -- Default delay in seconds. 0 means disabled. + ['backoff-factor'] = 3 -- Exponential factor. } Qless.config.get = function(key, default) @@ -1584,19 +1587,25 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed' , -1) end - redis.call('hmset', QlessJob.ns .. jid, - 'jid' , jid, - 'klass' , klass, - 'data' , raw_data, - 'priority' , priority, - 'tags' , cjson.encode(tags), - 'state' , ((delay > 0) and 'scheduled') or 'waiting', - 'worker' , '', - 'expires' , 0, - 'queue' , self.name, - 'retries' , retries, - 'remaining', retries, - 'time' , string.format("%.20f", now)) + local job_fields = { + 'jid' , jid, + 'klass' , klass, + 'data' , raw_data, + 'priority' , priority, + 'tags' , cjson.encode(tags), + 'state' , ((delay > 0) and 'scheduled') or 'waiting', + 'worker' , '', + 'expires' , 0, + 'queue' , self.name, + 'retries' , retries, + 'remaining', retries, + 'time' , string.format("%.20f", now) + } + if options['backoff'] then + table.insert(job_fields, 'backoff') + table.insert(job_fields, cjson.encode(options['backoff'])) + end + redis.call('hmset', QlessJob.ns .. jid, job_fields) for i, j in ipairs(depends) do local state = redis.call('hget', QlessJob.ns .. j, 'state') @@ -1954,7 +1963,34 @@ function QlessQueue:invalidate_locks(now, count) redis.call('zadd', 'ql:failed-jobs-list', now, jid) clearOldFailedJobs(now) else - table.insert(jids, jid) + local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff') + local backoff_config = {} + if backoff_json then + backoff_config = cjson.decode(backoff_json) + end + + local initial_delay = tonumber(backoff_config['initial_delay']) + local backoff_factor = tonumber(backoff_config['factor']) + if initial_delay == nil then + initial_delay = tonumber(Qless.config.get('backoff-initial-delay', 0)) + end + if backoff_factor == nil then + backoff_factor = tonumber(Qless.config.get('backoff-factor', 3)) + end + if initial_delay == 0 then + table.insert(jids, jid) + else + local job = Qless.job(jid) + local job_history = job:history() + local retry_count = #job_history - 1 + if retry_count < 0 then retry_count = 0 end + + local delay = initial_delay * (backoff_factor ^ retry_count) + + self.locks.remove(jid) + self.scheduled.add(now + delay, jid) + redis.call('hset', QlessJob.ns .. jid, 'state', 'scheduled') + end end end end