-
Notifications
You must be signed in to change notification settings - Fork 5
Add reties delay logic #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| <?php | ||
|
|
||
| namespace Qless\Queues\DTO; | ||
|
|
||
| class BackoffStrategyDTO | ||
| { | ||
| /** | ||
| * @var int | ||
| */ | ||
| private $initialDelay; | ||
|
|
||
| /** | ||
| * @var int | ||
| */ | ||
| private $factor; | ||
|
|
||
| public function __construct(int $initialDelay, int $factor) | ||
| { | ||
| $this->initialDelay = $initialDelay; | ||
| $this->factor = $factor; | ||
| } | ||
|
|
||
| /** | ||
| * @return int | ||
| */ | ||
| public function getFactor(): int | ||
| { | ||
| return $this->factor; | ||
| } | ||
|
|
||
| /** | ||
| * @return int | ||
| */ | ||
| public function getInitialDelay(): int | ||
| { | ||
| return $this->initialDelay; | ||
| } | ||
|
|
||
| public function toArray(): array | ||
| { | ||
| return [ | ||
| 'factor' => $this->getFactor(), | ||
| 'initial_delay' => $this->getInitialDelay(), | ||
| ]; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -385,7 +385,10 @@ Qless.config.defaults = { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['histogram-history'] = 7, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['jobs-history-count'] = 50000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['jobs-history'] = 604800, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['jobs-failed-history'] = 604800 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['jobs-failed-history'] = 604800, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| -- retries logic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['backoff-initial-delay'] = 0, -- Default delay in seconds. 0 means disabled. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ['backoff-factor'] = 3 -- Exponential factor. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Qless.config.get = function(key, default) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1584,19 +1587,25 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redis.call('hincrby', 'ql:s:stats:' .. bin .. ':' .. self.name, 'failed' , -1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redis.call('hmset', QlessJob.ns .. jid, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'jid' , jid, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'klass' , klass, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'data' , raw_data, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'priority' , priority, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'tags' , cjson.encode(tags), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'state' , ((delay > 0) and 'scheduled') or 'waiting', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'worker' , '', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'expires' , 0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'queue' , self.name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'retries' , retries, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'remaining', retries, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'time' , string.format("%.20f", now)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local job_fields = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'jid' , jid, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'klass' , klass, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'data' , raw_data, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'priority' , priority, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'tags' , cjson.encode(tags), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'state' , ((delay > 0) and 'scheduled') or 'waiting', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'worker' , '', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'expires' , 0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'queue' , self.name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'retries' , retries, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'remaining', retries, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'time' , string.format("%.20f", now) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if options['backoff'] then | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.insert(job_fields, 'backoff') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.insert(job_fields, cjson.encode(options['backoff'])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1604
to
+1607
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Avoid double-encoding and ignore empty backoff. options.backoff arrives JSON-encoded from PHP; encoding again stores a JSON string literal. Decode first and only persist non-empty tables. - if options['backoff'] then
- table.insert(job_fields, 'backoff')
- table.insert(job_fields, cjson.encode(options['backoff']))
- end
+ if options['backoff'] then
+ local raw = options['backoff']
+ local backoff = (type(raw) == 'string') and (cjson.decode(raw) or {}) or raw
+ if type(backoff) == 'table' and next(backoff) ~= nil then
+ table.insert(job_fields, 'backoff')
+ table.insert(job_fields, cjson.encode(backoff))
+ end
+ end📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redis.call('hmset', QlessJob.ns .. jid, job_fields) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1590
to
+1608
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix pipeline breaker: HMSET with a table (must unpack). This is the CI error (“Lua redis() command arguments must be strings or integers”). HMSET needs flat field/value pairs. - redis.call('hmset', QlessJob.ns .. jid, job_fields)
+ redis.call('hmset', QlessJob.ns .. jid, unpack(job_fields))📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for i, j in ipairs(depends) do | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local state = redis.call('hget', QlessJob.ns .. j, 'state') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1954,7 +1963,34 @@ function QlessQueue:invalidate_locks(now, count) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redis.call('zadd', 'ql:failed-jobs-list', now, jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| clearOldFailedJobs(now) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.insert(jids, jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local backoff_config = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if backoff_json then | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| backoff_config = cjson.decode(backoff_json) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1966
to
+1971
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Read backoff from the correct job key. Using Qless.ns builds “ql:”, not the job hash. - local backoff_json = redis.call('hget', Qless.ns .. jid, 'backoff')
+ local backoff_json = redis.call('hget', QlessJob.ns .. jid, 'backoff')📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local initial_delay = tonumber(backoff_config['initial_delay']) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local backoff_factor = tonumber(backoff_config['factor']) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if initial_delay == nil then | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| initial_delay = tonumber(Qless.config.get('backoff-initial-delay', 0)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if backoff_factor == nil then | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| backoff_factor = tonumber(Qless.config.get('backoff-factor', 3)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if initial_delay == 0 then | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.insert(jids, jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local job = Qless.job(jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local job_history = job:history() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local retry_count = #job_history - 1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if retry_count < 0 then retry_count = 0 end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local delay = initial_delay * (backoff_factor ^ retry_count) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1972
to
+1989
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Derive retry_count from retries/remaining, not history length. History length is not equal to attempts and can explode delays. Use (retries - remaining), clamped to >= 0. - local job = Qless.job(jid)
- local job_history = job:history()
- local retry_count = #job_history - 1
- if retry_count < 0 then retry_count = 0 end
-
- local delay = initial_delay * (backoff_factor ^ retry_count)
+ local total_retries = tonumber(redis.call('hget', QlessJob.ns .. jid, 'retries')) or 0
+ local remaining = tonumber(redis.call('hget', QlessJob.ns .. jid, 'remaining')) or 0
+ local retry_count = math.max(0, total_retries - remaining)
+ local delay = initial_delay * (backoff_factor ^ retry_count)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.locks.remove(jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.scheduled.add(now + delay, jid) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| redis.call('hset', QlessJob.ns .. jid, 'state', 'scheduled') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate inputs (non-negative delay, factor >= 1).
Protect against misconfiguration early.
public function __construct(int $initialDelay, int $factor) { + if ($initialDelay < 0) { + throw new \InvalidArgumentException('initialDelay must be >= 0'); + } + if ($factor < 1) { + throw new \InvalidArgumentException('factor must be >= 1'); + } $this->initialDelay = $initialDelay; $this->factor = $factor; }📝 Committable suggestion
🤖 Prompt for AI Agents