Improve transactional integrity for starting controller jobs in dispatcherd#16300
Improve transactional integrity for starting controller jobs in dispatcherd#16300fosterseth wants to merge 2 commits intoansible:develfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthrough
Changes
Sequence Diagram(s)sequenceDiagram
participant Dispatcher as Dispatcher
participant DB as Database
participant Worker as Worker
rect rgba(200,200,255,0.5)
Dispatcher->>DB: SELECT waiting jobs (include start_args)
end
rect rgba(200,255,200,0.5)
Dispatcher->>DB: Atomic UPDATE WHERE status='waiting' -> status='running', start_args=''
DB-->>Dispatcher: success / no-op (if raced)
alt success
Dispatcher->>Worker: enqueue/run job (pk)
else no-op
Note right of Dispatcher: Skip due to concurrent change
end
end
rect rgba(200,200,255,0.5)
Worker->>DB: LOAD instance by pk
alt instance.status == 'waiting'
Worker->>DB: UPDATE instance SET status='running'
Worker->>DB: RELOAD instance
end
Worker->>Worker: verify status == 'running' then proceed or abort
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
awx/main/tasks/jobs.py (2)
568-572:⚠️ Potential issue | 🟡 MinorMissing
Noneguard before accessingself.instance.status
update_model(pk)can returnNonewhen the job is deleted between dispatch and execution (the existing guard at line 752 confirms this). IfNoneis returned at line 569,self.instance.statusat line 570 raisesAttributeError.🐛 Proposed fix
self.instance = self.update_model(pk) + if not self.instance: + logger.error(f'Not starting task pk={pk} because the job appears to have been deleted') + return if self.instance.status != 'running': logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected') return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 568 - 572, After calling self.instance = self.update_model(pk) in the job start logic, guard against update_model returning None before accessing self.instance.status: check if self.instance is None and if so log a clear message (including pk) and return early to avoid AttributeError; update the block around update_model(pk) / status checks in the method (referencing update_model and self.instance.status) to perform this None check just prior to using .status.
202-215:⚠️ Potential issue | 🟠 Major
start_argsis cleared beforeget_start_kwargs()reads it — launch-time kwargs lost
start_argsis not included in the.only()projection at lines 202–204, making it a deferred field. The.update()call at line 208 clearsstart_args=''in the database. Whenuj.get_start_kwargs()executes at line 212 and accessesself.start_args, Django triggers a freshSELECTquery, retrieving the now-empty value from the database. Sinceget_start_kwargs()returnsNonefor emptystart_args,binder.control(...)is always invoked with empty kwargs, losing any launch-time data (passwords, extra variables) needed to run the job.In the previous code,
dispatch_waiting_jobswould have readstart_argsbefore the database clear, preserving the value throughget_start_kwargs().Fix: add
'start_args'to the.only()projection to cache the field value before the.update()call.Proposed fix
for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only( - 'id', 'status', 'polymorphic_ctype', 'celery_task_id', 'cancel_flag' + 'id', 'status', 'polymorphic_ctype', 'celery_task_id', 'cancel_flag', 'start_args' ):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 202 - 215, The issue is that start_args is deferred (not included in the UnifiedJob.objects.only(...) projection) and gets cleared by the subsequent update call, so uj.get_start_kwargs() re-reads the emptied DB value; fix by adding 'start_args' to the .only(...) projection used when iterating UnifiedJob so the field is loaded into uj before UnifiedJob.objects.filter(...).update(status='running', start_args='') runs, ensuring get_start_kwargs() returns the original launch-time kwargs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@awx/main/tasks/jobs.py`:
- Around line 205-207: The cancel path currently uses a raw queryset update
(UnifiedJob.objects.filter(...).update(...)) which bypasses model lifecycle
hooks; replace this with the model-level cancellation flow by calling
uj.update_model(status='canceled') (or at minimum set uj.status='canceled',
compute/set uj.finished and uj.elapsed, call uj.save() so save() hooks run, then
call uj.websocket_emit_status('canceled') and trigger notifications) instead of
the direct .update(); ensure the logic only transitions jobs that are still
waiting (guard on uj.status or the filter) to avoid races.
---
Outside diff comments:
In `@awx/main/tasks/jobs.py`:
- Around line 568-572: After calling self.instance = self.update_model(pk) in
the job start logic, guard against update_model returning None before accessing
self.instance.status: check if self.instance is None and if so log a clear
message (including pk) and return early to avoid AttributeError; update the
block around update_model(pk) / status checks in the method (referencing
update_model and self.instance.status) to perform this None check just prior to
using .status.
- Around line 202-215: The issue is that start_args is deferred (not included in
the UnifiedJob.objects.only(...) projection) and gets cleared by the subsequent
update call, so uj.get_start_kwargs() re-reads the emptied DB value; fix by
adding 'start_args' to the .only(...) projection used when iterating UnifiedJob
so the field is loaded into uj before
UnifiedJob.objects.filter(...).update(status='running', start_args='') runs,
ensuring get_start_kwargs() returns the original launch-time kwargs.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
awx/main/tasks/jobs.py
awx/main/tasks/jobs.py
Outdated
| if uj.cancel_flag: | ||
| UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='canceled', start_args='') | ||
| continue |
There was a problem hiding this comment.
Raw .update() on the cancel path skips model lifecycle hooks
UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='canceled', start_args='') is a raw SQL UPDATE that bypasses Django's save() and all downstream effects:
finishedandelapsedtimestamps are never set → broken job duration display and statsfailedderived field is not updatedwebsocket_emit_status('canceled')is never called → UI is not notified, job appears stuck until manual refresh- Notification templates are never triggered
Before this PR, a waiting job with cancel_flag=True would be dispatched, reach run(), and be properly cancelled at lines 598–607 via update_model(), which invokes save() and all lifecycle hooks. The new path short-circuits this.
At a minimum, after a successful update(), call uj.websocket_emit_status('canceled') and set finished/elapsed (or delegate to a helper). A more robust fix reuses the existing cancellation machinery (e.g. calling uj.update_model(status='canceled') with a guard) rather than a bare update().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@awx/main/tasks/jobs.py` around lines 205 - 207, The cancel path currently
uses a raw queryset update (UnifiedJob.objects.filter(...).update(...)) which
bypasses model lifecycle hooks; replace this with the model-level cancellation
flow by calling uj.update_model(status='canceled') (or at minimum set
uj.status='canceled', compute/set uj.finished and uj.elapsed, call uj.save() so
save() hooks run, then call uj.websocket_emit_status('canceled') and trigger
notifications) instead of the direct .update(); ensure the logic only
transitions jobs that are still waiting (guard on uj.status or the filter) to
avoid races.
|
@AlanCoding
|
Yes, I believe that would be true.
There's a form of accounting here to worry about. Consider the topic of "reaping" jobs in the running status. This probably needs some updating to account for the special state where the dispatcher main process is still running looking for a worker to hand the RunJob.run task to, or something like that. To make this complete and trackable, I believe that the task submission to the
I would say "no". Because the best thing we can do for a canceled job is to submit RunJob.run, and let that handle it. |
7852d73 to
00cb8e1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
awx/main/tasks/jobs.py (1)
205-207:⚠️ Potential issue | 🟠 MajorCancel path still bypasses all model lifecycle hooks
UnifiedJob.objects.filter(...).update(status='canceled', start_args='')is a raw SQL UPDATE that skipssave(), leavingfinished/elapsedunset, suppressingwebsocket_emit_status('canceled')(UI shows the job as stuck), and preventing notification templates from firing.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 205 - 207, The cancel branch is using a queryset.update which bypasses model lifecycle hooks (UnifiedJob.objects.filter(...).update(...)), leaving finished/elapsed unset and preventing websocket_emit_status('canceled') and notification templates from running; instead, load the instance (uj), set uj.status = 'canceled' and uj.start_args = '' and then call the model's proper save/cancel path (e.g., call uj.cancel() if a cancel method exists, or set finished/elapsed as appropriate and call uj.save()) so that save() signals and websocket_emit_status('canceled') and notification handlers are executed.
🧹 Nitpick comments (1)
awx/main/tasks/jobs.py (1)
568-572: Redundantself.instance.statusin error messageThe format string embeds
self.instance.statustwice. Minor wording issue.✏️ Suggested fix
- logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected') + logger.error(f'Skipping task pk={pk}: unexpected status "{self.instance.status}"')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 568 - 572, The error message in the job start path redundantly interpolates self.instance.status twice; update the logger.error call in the block after update_model(pk) (where self.instance is set and checked) to use a concise message that mentions the pk and the unexpected status only once (e.g., "Not starting task pk={pk} because its status \"{status}\" is not expected" or include the expected value 'running'), ensuring you reference self.instance.status and pk via their variables and keep logger.error on the same call site.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@awx/main/tasks/jobs.py`:
- Around line 211-215: The job status is set to 'running' before calling
binder.control (UnifiedJob update at pk=uj.pk), so if binder.control(...) raises
the loop exits and the job is left stranded; wrap the call to
binder.control(serialize_task(...), args..., uuid=uj.celery_task_id) in a
try/except, and on exception revert the DB change by updating the same
UnifiedJob row (filter by pk and status='running') back to 'waiting' and
restoring start_args (or clearing it the same way), log the exception, and
continue the loop so remaining jobs are processed; ensure the revert uses a
conditional update (the same pattern as the original filter) to avoid race
conditions.
---
Duplicate comments:
In `@awx/main/tasks/jobs.py`:
- Around line 205-207: The cancel branch is using a queryset.update which
bypasses model lifecycle hooks (UnifiedJob.objects.filter(...).update(...)),
leaving finished/elapsed unset and preventing websocket_emit_status('canceled')
and notification templates from running; instead, load the instance (uj), set
uj.status = 'canceled' and uj.start_args = '' and then call the model's proper
save/cancel path (e.g., call uj.cancel() if a cancel method exists, or set
finished/elapsed as appropriate and call uj.save()) so that save() signals and
websocket_emit_status('canceled') and notification handlers are executed.
---
Nitpick comments:
In `@awx/main/tasks/jobs.py`:
- Around line 568-572: The error message in the job start path redundantly
interpolates self.instance.status twice; update the logger.error call in the
block after update_model(pk) (where self.instance is set and checked) to use a
concise message that mentions the pk and the unexpected status only once (e.g.,
"Not starting task pk={pk} because its status \"{status}\" is not expected" or
include the expected value 'running'), ensuring you reference
self.instance.status and pk via their variables and keep logger.error on the
same call site.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
awx/main/tasks/jobs.py
00cb8e1 to
ff1e0d4
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
awx/main/tasks/jobs.py (1)
200-207:⚠️ Potential issue | 🟠 Major
binder.control()exception still exits the loop, skipping remaining waiting jobsThe stranding-in-
runningconcern from the prior review is resolved (the DB update is now after the dispatch). However, the loop-exit issue is unchanged: any exception frombinder.control()or the subsequent.update()propagates out of theforloop, leaving all remaining waiting jobs unprocessed until the nextdispatch_waiting_jobscycle.🛡️ Proposed fix
for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id'): kwargs = uj.get_start_kwargs() if not kwargs: kwargs = {} - binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id}) - UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='running', start_args='') + try: + binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id}) + UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(status='running', start_args='') + except Exception: + logger.exception('Failed to dispatch job pk=%s, skipping', uj.pk) + continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 200 - 207, The loop in dispatch_waiting_jobs can exit if binder.control() or the subsequent UnifiedJob.objects.filter(...).update(...) raises, so wrap the per-job dispatch and update in a try/except that catches exceptions from binder.control and the update, logs the error (including uj.id and exception details), and then continues to the next job; reference the dispatch_waiting_jobs function, the binder.control(...) call, and the UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(...) call so you handle exceptions separately for dispatch and DB update and avoid breaking the entire for-loop.
🧹 Nitpick comments (1)
awx/main/tasks/jobs.py (1)
202-203:start_argsmissing from.only()→ N+1 queries per dispatch cycle
get_start_kwargs()readsstart_args, which is not in the.only()projection. Django will issue a separateSELECTforstart_argson every loop iteration.♻️ Proposed fix
- for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id'): + for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id', 'start_args'):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 202 - 203, The loop loads UnifiedJob objects with a limited projection but omits the start_args field which get_start_kwargs() accesses, causing N+1 queries; modify the query that builds UnifiedJob.objects.filter(...).only('id', 'status', 'polymorphic_ctype', 'celery_task_id') to also include 'start_args' (or switch to an appropriate .defer/.select_related strategy) so that uj.get_start_kwargs() does not trigger a separate SELECT per iteration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@awx/main/tasks/jobs.py`:
- Around line 563-565: In run(), the atomic update against UnifiedJob
(filter(pk=pk, status='waiting').update(...)) must have its return value checked
to ensure this worker actually claimed the job; change the block in
UnifiedJob.run() to capture the integer returned by
UnifiedJob.objects.filter(pk=pk, status='waiting').update(...) and only call
self.update_model(pk) and continue processing when that value == 1 (otherwise
abort/return or raise a controlled error so the job is not executed twice);
refer to the update call, the run() method, and self.instance/update_model(pk)
to locate and guard the reload/continuation logic.
---
Duplicate comments:
In `@awx/main/tasks/jobs.py`:
- Around line 200-207: The loop in dispatch_waiting_jobs can exit if
binder.control() or the subsequent UnifiedJob.objects.filter(...).update(...)
raises, so wrap the per-job dispatch and update in a try/except that catches
exceptions from binder.control and the update, logs the error (including uj.id
and exception details), and then continues to the next job; reference the
dispatch_waiting_jobs function, the binder.control(...) call, and the
UnifiedJob.objects.filter(pk=uj.pk, status='waiting').update(...) call so you
handle exceptions separately for dispatch and DB update and avoid breaking the
entire for-loop.
---
Nitpick comments:
In `@awx/main/tasks/jobs.py`:
- Around line 202-203: The loop loads UnifiedJob objects with a limited
projection but omits the start_args field which get_start_kwargs() accesses,
causing N+1 queries; modify the query that builds
UnifiedJob.objects.filter(...).only('id', 'status', 'polymorphic_ctype',
'celery_task_id') to also include 'start_args' (or switch to an appropriate
.defer/.select_related strategy) so that uj.get_start_kwargs() does not trigger
a separate SELECT per iteration.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
awx/main/tasks/jobs.py
awx/main/tasks/jobs.py
Outdated
| if self.instance.status == 'waiting': | ||
| UnifiedJob.objects.filter(pk=pk, status='waiting').update(status='running', start_args='') | ||
| self.instance = self.update_model(pk) |
There was a problem hiding this comment.
Missing ownership check on the conditional .update() in run() — potential double-execution
If the same job is dispatched twice (e.g., the DB update at line 207 failed on a prior dispatch_waiting_jobs run), two workers can both load status='waiting' and reach line 564. The conditional update is atomic — only one gets 1 row — but neither checks the return value. Both then reload the instance as 'running' and proceed to execute the job.
🛡️ Proposed fix
if self.instance.status == 'waiting':
- UnifiedJob.objects.filter(pk=pk, status='waiting').update(status='running', start_args='')
- self.instance = self.update_model(pk)
+ updated = UnifiedJob.objects.filter(pk=pk, status='waiting').update(status='running', start_args='')
+ if not updated:
+ logger.warning('Job pk=%s was already claimed by another worker; skipping', pk)
+ return
+ self.instance = self.update_model(pk)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@awx/main/tasks/jobs.py` around lines 563 - 565, In run(), the atomic update
against UnifiedJob (filter(pk=pk, status='waiting').update(...)) must have its
return value checked to ensure this worker actually claimed the job; change the
block in UnifiedJob.run() to capture the integer returned by
UnifiedJob.objects.filter(pk=pk, status='waiting').update(...) and only call
self.update_model(pk) and continue processing when that value == 1 (otherwise
abort/return or raise a controlled error so the job is not executed twice);
refer to the update call, the run() method, and self.instance/update_model(pk)
to locate and guard the reload/continuation logic.
ff1e0d4 to
f18c4fe
Compare
f18c4fe to
b57abf5
Compare
b57abf5 to
7d7938b
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (2)
awx/main/tasks/jobs.py (2)
568-569:⚠️ Potential issue | 🟠 MajorReturn value of
.update()is unchecked — double-execution race still present.If two workers both load the instance as
waitingand both reach line 568, only one will getupdated=1from the DB, but neither checks the return value. Both then setself.instance.status = 'running'locally and proceed to execute the job.🛡️ Proposed fix
- UnifiedJob.objects.filter(pk=pk, status='waiting').update(status='running', start_args='') - self.instance.status = 'running' + updated = UnifiedJob.objects.filter(pk=pk, status='waiting').update(status='running', start_args='') + if not updated: + logger.warning('Job pk=%s already claimed by another worker; skipping', pk) + return + self.instance.status = 'running'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 568 - 569, The update call on UnifiedJob.objects.filter(pk=pk, status='waiting').update(...) is not checked, so two workers can both proceed after a race; change the logic in the job start path to capture the integer return value from that update, and only set self.instance.status = 'running' and continue when updated == 1 (otherwise abort/raise/return indicating another worker won the race). Locate the UnifiedJob.objects.filter(...).update(...) call and the subsequent self.instance.status assignment (referencing pk and start_args) and gate the status change + execution on the update result, handling the non-1 case by stopping further execution.
563-566:⚠️ Potential issue | 🟠 MajorRaw
.update()on cancel path still bypasses model lifecycle hooks.
finished/elapsedtimestamps are never set,websocket_emit_status('canceled')is never called, and notification templates are never triggered — the job will appear permanently stuck in the UI until a manual refresh, and duration metrics will be broken.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awx/main/tasks/jobs.py` around lines 563 - 566, The cancel branch is using a raw queryset update (UnifiedJob.objects.filter(pk=pk, status='waiting').update(...)) which bypasses model lifecycle hooks so finished/elapsed timestamps, websocket_emit_status('canceled'), and notification templates never run; replace the raw update with the model-level cancel flow: set self.instance.status = 'canceled', call the model method that handles cancellation (or call self.instance.save() after updating fields and explicitly set finished and elapsed), then invoke websocket_emit_status('canceled') and trigger the job notification/path that sends templates (or call the existing UnifiedJob.cancel() or post-save hook) so all lifecycle hooks and metrics run as expected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@awx/main/tasks/jobs.py`:
- Around line 568-569: The update call on UnifiedJob.objects.filter(pk=pk,
status='waiting').update(...) is not checked, so two workers can both proceed
after a race; change the logic in the job start path to capture the integer
return value from that update, and only set self.instance.status = 'running' and
continue when updated == 1 (otherwise abort/raise/return indicating another
worker won the race). Locate the UnifiedJob.objects.filter(...).update(...) call
and the subsequent self.instance.status assignment (referencing pk and
start_args) and gate the status change + execution on the update result,
handling the non-1 case by stopping further execution.
- Around line 563-566: The cancel branch is using a raw queryset update
(UnifiedJob.objects.filter(pk=pk, status='waiting').update(...)) which bypasses
model lifecycle hooks so finished/elapsed timestamps,
websocket_emit_status('canceled'), and notification templates never run; replace
the raw update with the model-level cancel flow: set self.instance.status =
'canceled', call the model method that handles cancellation (or call
self.instance.save() after updating fields and explicitly set finished and
elapsed), then invoke websocket_emit_status('canceled') and trigger the job
notification/path that sends templates (or call the existing UnifiedJob.cancel()
or post-save hook) so all lifecycle hooks and metrics run as expected.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
awx/main/tasks/jobs.py
c31a87f to
632cd6b
Compare
awx/main/tasks/jobs.py
Outdated
| self.instance = self.update_model(pk) | ||
|
|
||
| if self.instance.status == 'waiting': | ||
| UnifiedJob.objects.filter(pk=pk).update(status="running", start_args='') |
There was a problem hiding this comment.
Missing status condition in filter allows status overwrite
Low Severity
The update in BaseTask.run uses UnifiedJob.objects.filter(pk=pk).update(status="running") without a status='waiting' condition in the filter. In contrast, dispatch_waiting_jobs correctly uses filter(pk=uj.pk, status='waiting') for an atomic conditional update. If any other process changes the job's status between the update_model read and this filter().update() call, the unconditional update would overwrite that status back to 'running'.
Additional Locations (1)
…backs Move status transition from BaseTask.transition_status (which used SELECT FOR UPDATE inside transaction.atomic()) into dispatch_waiting_jobs. The new approach uses a conditional filter().update() which is atomic at the database level without requiring explicit row locks, reducing transaction contention and rollbacks observed in perfscale testing. The transition_status method was an artifact of the feature flag era where we needed to support both old and new code paths. Since dispatch_waiting_jobs is already a singleton (on_duplicate='queue_one') scoped to the local node, the de-duplication logic is unnecessary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Seth Foster <fosterbseth@gmail.com>
2cad30c to
8ba1966
Compare
| try: | ||
| task.run(job.id) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Test assertion silently swallowed by exception handler
Medium Severity
The assertion at line 44 inside check_status_is_running will never cause the test to fail. The side_effect function is invoked from self.pre_run_hook() at line 600 of run(), which is inside a try block. If the assertion fails, the resulting AssertionError (a subclass of Exception) is caught by the except Exception: handler at line 735 of run(). Since run() swallows the error internally, and the test has no top-level assertions after the try/except block, this test always passes regardless of the actual job status.
Verify that when a worker picks up a job before dispatch_waiting_jobs updates the status, RunJob.run() transitions it from waiting to running before pre_run_hook is called. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Seth Foster <fosterbseth@gmail.com>
8ba1966 to
466bbc4
Compare
|





Summary
SELECT FOR UPDATE/transaction.atomic()fromBaseTask.transition_statuswhich was causing increased database transaction rollbacks observed in perfscale testingdispatch_waiting_jobs, using a conditionalfilter().update()that is atomic at the DB level without explicit row lockstransition_statusmethod entirely — it was an artifact of the feature flag era and unnecessary givendispatch_waiting_jobsis already a singleton (on_duplicate='queue_one') scoped to the local noderunning— this prevents the reaper from incorrectly reaping jobs during the handoff windowBaseTask.runhandles the race where a worker picks up the task before the status update lands by acceptingwaitingand transitioning it torunningitself🤖 Generated with Claude Code
Note
Medium Risk
Changes the job start/status transition flow between
dispatch_waiting_jobsand workerBaseTask.run, which can affect job lifecycle and cancellation edge cases under concurrency, but is localized to controller job dispatch/execution.Overview
Moves the unified job
waiting→runningstatus transition out ofBaseTask.run’s transactionalSELECT FOR UPDATElogic and intodispatch_waiting_jobsvia a conditionalUPDATE, reducing lock contention/rollbacks when starting jobs.Updates
BaseTask.runto tolerate the race where a worker starts before the dispatch status update lands by self-promotingwaitingtorunning, and adds an early-cancel path that immediately marks jobscanceledbefore execution begins. Adds a functional test ensuringRunJob.run()transitions awaitingjob torunningbeforepre_run_hookexecutes.Written by Cursor Bugbot for commit 466bbc4. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
Refactor
Breaking Changes