Skip to content

Fix job retry#154

Closed
Lucius1274 wants to merge 4 commits intomainfrom
fix_job_retry
Closed

Fix job retry#154
Lucius1274 wants to merge 4 commits intomainfrom
fix_job_retry

Conversation

@Lucius1274
Copy link
Copy Markdown
Collaborator

@Lucius1274 Lucius1274 commented Feb 4, 2026

The new fix-job-retry logic improves the previous one by:

  • finding stuck procrastinate jobs
  • (new) finding the appropriate Dicom/AnalysisTask for every procrastinate task and resetting its state
  • (new) resetting the job state that the task belonged to, if needed;
  • retrying the procrastinating job (now the task and job states will adapt their status accordingly).

In the event of server failure (or container restart), all tasks in progress will now be correctly retried.

Summary by CodeRabbit

  • New Features

    • Improved stalled job recovery with more robust state management and enhanced error handling.
    • Added identification and reporting of orphaned jobs during recovery operations.
  • Improvements

    • Enhanced logging for successful and failed recovery attempts.
    • Provides consolidated results summary showing number of recovered tasks and orphaned jobs.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

The change introduces async handling for retrying stalled procrastinate jobs in a Django management command with helper utilities for task identification and state management.

Changes

Cohort / File(s) Summary
Async Job Retry Orchestration
adit_radis_shared/common/management/commands/retry_stalled_jobs.py
Adds three public helper functions (get_task_models, reset_task_state, update_job_status) to manage task state. Introduces async handle_retry_stalled_jobs() method returning recovered and orphaned task counts. Implements _reset_task_state() with transaction boundary to reset task state and update job status atomically. Refactors handle() to execute async logic via asyncio.run() and display consolidated results with improved logging.

Sequence Diagram(s)

sequenceDiagram
    participant CMD as Management Command
    participant TASK as Task Models (ORM)
    participant JOB as Procrastinate Job Manager
    participant TX as Transaction

    CMD->>CMD: handle() invokes asyncio.run()
    CMD->>CMD: handle_retry_stalled_jobs() starts async flow
    CMD->>JOB: Query stalled procrastinate jobs
    JOB-->>CMD: Return stalled jobs list
    
    loop For each stalled job
        CMD->>TASK: get_task_models() identify matching task
        TASK-->>CMD: Task model instance
        
        CMD->>TX: Begin `@transaction.atomic`
        CMD->>TASK: _reset_task_state() reset task state
        TASK-->>TASK: Update task ORM record
        CMD->>TASK: update_job_status() sync job status
        TASK-->>TASK: Update job ORM record
        TX-->>CMD: Commit transaction
        
        CMD->>JOB: app.job_manager.retry() with priority
        JOB-->>CMD: Job requeued
        
        CMD->>CMD: Log success or failure
    end
    
    CMD-->>CMD: Return (recovered_count, orphaned_count)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 A stalled job waits in the queue,
Async magic makes it anew,
Transactions protect each state,
While procrastinate jobs migrate,
Recovered dreams hop right through! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Fix job retry' is vague and generic, using non-descriptive terms that don't convey meaningful information about the specific improvements made to the job retry mechanism. Consider a more descriptive title such as 'Improve stalled job retry with task state reset' or 'Add asynchronous job recovery for procrastinate tasks' to better communicate the key changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix_job_retry

Important

Action Needed: IP Allowlist Update

If your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:

  • 136.113.208.247/32 (new)
  • 34.170.211.100/32
  • 35.222.179.152/32

Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @Lucius1274, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the retry_stalled_jobs management command to provide a more comprehensive and robust recovery mechanism for jobs that become stuck. The core improvement lies in its ability to not only retry the underlying procrastinate job but also to identify and correctly reset the state of associated application-specific tasks and their parent jobs. This ensures that after an unexpected interruption, such as a server failure, all in-progress tasks and their corresponding jobs are properly reset to a pending state and retried, preventing data inconsistencies and improving system resilience.

Highlights

  • Enhanced Stalled Job Recovery: The command now intelligently identifies and resets the state of application-level tasks (like Dicom/AnalysisTask) associated with stalled procrastinate jobs, rather than just retrying the procrastinate job itself.
  • Atomic State Management: Task and job state resets are now performed within a database transaction, ensuring data consistency even if an error occurs during the recovery process.
  • Dynamic Task Model Detection: The command can dynamically discover relevant task models across the application by checking for specific attributes, making it more flexible and adaptable to different task types.
  • Improved Job State Synchronization: Parent job statuses are updated to reflect the new state of their constituent tasks, preventing jobs from being stuck in an IN_PROGRESS state if all their tasks are reset.
Changelog
  • adit_radis_shared/common/management/commands/retry_stalled_jobs.py
    • Added helper functions (get_task_models, reset_task_state, update_job_status) to encapsulate logic for identifying task models and resetting their states.
    • Modified handle_retry_stalled_jobs to iterate through identified task models, find tasks linked to stalled procrastinate jobs, and orchestrate their state reset and job retry.
    • Introduced _reset_task_state method, decorated with @transaction.atomic, to ensure atomic updates of task and associated job states.
    • Updated the handle method to provide more detailed output regarding the number of recovered tasks and orphaned jobs.
    • Added logging for recovery attempts and failures.
    • Imported necessary Django modules (apps, transaction, sync_to_async) and Python's logging.
Activity
  • No specific activity has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a more sophisticated logic for retrying stalled jobs by linking them back to application-level tasks. The overall direction is good, but there are a few critical issues in the implementation that need to be addressed. The most critical issue is that the current logic will retry a job multiple times if it's associated with multiple tasks, which can lead to unexpected behavior. The calculation of orphaned jobs is also incorrect in this scenario. I've recommended refactoring the main loop to be job-centric to fix this. Additionally, there's a critical bug in how job statuses are determined, which could lead to incorrect state transitions. I've also pointed out some areas where the code can be made safer and more maintainable. Please see the detailed comments for specifics.

Comment on lines +44 to +45
pending_job_status = getattr(job.Status, "PENDING", "PENDING")
in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The use of getattr with default string values here is dangerous. Specifically, getattr(job.Status, "IN_PROGRESS", "PENDING") is a likely bug. If job.Status.IN_PROGRESS does not exist, in_progress_status will be "PENDING", which will corrupt the logic of this function. It's safer to access these attributes directly, assuming they exist on the job's Status enum (which is a reasonable assumption given the context). This avoids potential bugs and makes the code's intent clearer.

Suggested change
pending_job_status = getattr(job.Status, "PENDING", "PENDING")
in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
pending_job_status = job.Status.PENDING
in_progress_status = job.Status.IN_PROGRESS

Comment on lines +70 to +97
for task_model in task_models:
tasks = await sync_to_async(list)(
task_model.objects.filter(queued_job_id__in=job_ids.keys()).select_related("job")
)

for task in tasks:
stalled_job = job_ids.get(getattr(task, "queued_job_id", None))
if not stalled_job:
continue

try:
# Reset task state in transaction
await sync_to_async(self._reset_task_state)(task)

# Retry the job after state is committed (already in async context)
retry_priority = getattr(settings, "STALLED_JOBS_RETRY_PRIORITY", 0)
await app.job_manager.retry_job(stalled_job, priority=retry_priority)
logger.info(f"Retried stalled job {stalled_job.id}")

tasks_recovered += 1
except Exception as e:
logger.error(f"Failed to recover task {task.id} (job {stalled_job.id}): {e}")
failed_tasks += 1

# Check if any procrastinate jobs are still stalled without corresponding tasks
orphaned = len(job_ids) - tasks_recovered - failed_tasks

stalled_jobs_num = 0
for job in stalled_jobs:
await app.job_manager.retry_job(job, priority=priority)
stalled_jobs_num += 1
return tasks_recovered, orphaned
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation has a critical flaw: it retries a procrastinate job for every associated task it finds. If a single stalled job is linked to multiple tasks, it will be retried multiple times, leading to unintended multiple executions. Additionally, the calculation for orphaned jobs is incorrect when a job is associated with more than one task, which can result in a negative number.

The logic should be refactored to be job-centric instead of task-centric. A better approach would be:

  1. Build a map of job_id to a list of its associated tasks.
  2. Iterate through the stalled jobs from this map.
  3. For each job, reset all its associated tasks in a transaction.
  4. After successfully resetting all tasks for a job, retry the procrastinate job once.
  5. Jobs without any associated tasks in the map are the orphaned jobs.

Comment on lines +25 to +26
pending_status = getattr(task.Status, "PENDING", "PENDING")
task.status = pending_status
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using getattr with a string default value "PENDING" can be misleading. Since get_task_models already ensures that task.Status.PENDING exists, you can directly access it. This makes the code clearer and safer, and also more concise.

Suggested change
pending_status = getattr(task.Status, "PENDING", "PENDING")
task.status = pending_status
task.status = task.Status.PENDING

Comment on lines +41 to +51
if hasattr(job, "update_job_state"): # RADIS
job.update_job_state()
else: # ADIT
pending_job_status = getattr(job.Status, "PENDING", "PENDING")
in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
if (
job.status == in_progress_status
and not job.tasks.filter(status=in_progress_status).exists()
):
job.status = pending_job_status
job.save()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic branches based on the existence of update_job_state, with comments indicating it's for different systems ('RADIS'/'ADIT'). While this works, it makes this shared command less generic and harder to maintain as it's tightly coupled to the implementation details of different systems. Consider defining a common interface (e.g., an abstract method on a base Job model) that all job types must implement. This would allow for a more robust and extensible design.

@claude
Copy link
Copy Markdown

claude bot commented Feb 4, 2026

Pull Request Review: Fix job retry

Summary

This PR significantly improves the retry_stalled_jobs management command by adding comprehensive task state recovery. The changes properly reset Django task states and their parent job states when retrying stalled Procrastinate jobs.

Strengths

  1. Comprehensive State Recovery: The new logic properly handles the full lifecycle - finding stalled jobs, resetting task states, updating parent job states, and then retrying the Procrastinate job.

  2. Good Error Handling: The try-except block around task recovery (lines 80-92) properly logs failures while continuing to process other tasks.

  3. Better Reporting: The command now provides more detailed feedback about recovered tasks and orphaned jobs.

  4. Atomic Transactions: Using transaction.atomic on _reset_task_state (line 99) ensures database consistency.

  5. Flexible Model Discovery: The get_task_models() function dynamically finds task models across ADIT and RADIS projects.

Critical Issues

1. Transaction Boundary Problem (Lines 82, 99-102)

The current code has a critical race condition. The transaction decorator on _reset_task_state doesn't actually protect the workflow. If retry_job() fails or the process crashes between lines 82 and 86, you'll have task reset to PENDING, job potentially reset to PENDING, but the Procrastinate job is still stalled (not retried). This leaves the system in an inconsistent state.

Recommendation: Either move retry_job() inside the transaction, reverse the order (retry first, then update state), or add idempotency checks.

2. Missing Type Annotations (Line 15)

get_task_models() should have return type annotation: list[type]. According to CLAUDE.md, this project uses pyright type checking.

3. Inefficient Query Pattern (Lines 70-73)

select_related(job) assumes all task models have a job relation, but get_task_models() only checks for queued_job_id. If a task model lacks a job relation, this will raise FieldError.

Recommendation: Check for job relation before select_related.

Medium Priority Issues

  1. Silent Failures in Job Status Update (Lines 44-51): Using getattr() with fallback values might hide configuration errors.

  2. Inconsistent Attribute Access (Lines 29-34): Hardcoded ADIT vs RADIS differences should be documented or abstracted.

  3. Comment Style (Line 14): Should be single # per Google Python Style Guide.

  4. Logging Could Be More Detailed: Add task ID and model name to log messages.

Test Coverage

Major Gap: This PR has no tests. Given the complexity of the changes, I strongly recommend adding tests for happy path, multiple task models, transaction rollback scenarios, orphaned jobs, job status updates, and edge cases.

Security and Performance

No critical security issues identified. Performance is acceptable for a disaster recovery command that runs periodically.

Recommendations Summary

Must Fix Before Merge:

  1. Fix the transaction boundary/race condition (lines 82-86)
  2. Add comprehensive test coverage
  3. Fix the select_related assumption (line 72)

Should Fix Before Merge:
4. Add type annotations
5. Add safer error handling in update_job_status()
6. Improve logging detail

Conclusion

This is a significant improvement over the previous implementation and addresses real production issues with server failures. The core logic is sound, but the transaction boundary issue is critical and must be fixed before merging. With proper testing and the transaction fix, this will be production-ready.

Great work on improving the resiliency of the job retry system!

@Lucius1274 Lucius1274 marked this pull request as draft February 4, 2026 16:13
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@adit_radis_shared/common/management/commands/retry_stalled_jobs.py`:
- Around line 24-35: The code in reset_task_state uses getattr(task.Status,
"PENDING", "PENDING") which can silently assign an invalid raw string; instead,
ensure you fetch the real enum member and fail loudly: check that task.Status
has attribute "PENDING" (or use getattr without a default and catch
AttributeError) and raise a clear exception if it's missing, then assign that
enum value to task.status, increment attempts and clear timestamps as currently
done, and call task.save(); reference the reset_task_state function and
task.Status to locate where to change the fallback behavior.
- Around line 44-45: The code sets in_progress_status with an incorrect fallback
("PENDING") which can mask missing enum values; update the retrieval of
job.Status values by removing misleading string fallbacks — use
getattr(job.Status, "PENDING", None) and getattr(job.Status, "IN_PROGRESS",
None) (or raise/early-return if either is None) and adjust the check that
compares against in_progress_status so it only runs when both pending_job_status
and in_progress_status are present; refer to the variables pending_job_status,
in_progress_status and the job.Status enum in the retry_stalled_jobs logic.
🧹 Nitpick comments (2)
adit_radis_shared/common/management/commands/retry_stalled_jobs.py (2)

14-21: Consider adding return type hint and refining comment style.

The function works correctly for dynamic model discovery, but could benefit from a type hint and a more conventional docstring instead of the informal ## some helper functions comment.

💡 Suggested improvement
-## some helper functions
-def get_task_models():
+def get_task_models() -> list:
+    """Return all Django models that represent retryable tasks."""
     task_models = []
     for model in apps.get_models():

90-92: Use logger.exception to preserve traceback information.

Catching bare Exception and using logger.error loses valuable debugging information. Using logger.exception will automatically include the traceback in the log output, making it easier to diagnose failures during recovery.

♻️ Suggested fix
-                except Exception as e:
-                    logger.error(f"Failed to recover task {task.id} (job {stalled_job.id}): {e}")
+                except Exception:
+                    logger.exception(f"Failed to recover task {task.id} (job {stalled_job.id})")
                     failed_tasks += 1

Comment on lines +24 to +35
def reset_task_state(task) -> None:
pending_status = getattr(task.Status, "PENDING", "PENDING")
task.status = pending_status
task.attempts = (task.attempts or 0) + 1
task.message = "Recovered from stalled state"
if hasattr(task, "start"):
task.start = None
task.end = None
elif hasattr(task, "started_at"):
task.started_at = None
task.ended_at = None
task.save()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fallback value may cause incorrect status assignment.

The fallback "PENDING" string in getattr(task.Status, "PENDING", "PENDING") would assign an incorrect raw string instead of the actual enum value (e.g., "PE" based on the codebase's Status enum). While get_task_models() validates that PENDING exists, a defensive approach would be to raise an error rather than silently use an invalid fallback.

🛡️ Suggested fix
 def reset_task_state(task) -> None:
-    pending_status = getattr(task.Status, "PENDING", "PENDING")
+    pending_status = task.Status.PENDING
     task.status = pending_status
🤖 Prompt for AI Agents
In `@adit_radis_shared/common/management/commands/retry_stalled_jobs.py` around
lines 24 - 35, The code in reset_task_state uses getattr(task.Status, "PENDING",
"PENDING") which can silently assign an invalid raw string; instead, ensure you
fetch the real enum member and fail loudly: check that task.Status has attribute
"PENDING" (or use getattr without a default and catch AttributeError) and raise
a clear exception if it's missing, then assign that enum value to task.status,
increment attempts and clear timestamps as currently done, and call task.save();
reference the reset_task_state function and task.Status to locate where to
change the fallback behavior.

Comment on lines +44 to +45
pending_job_status = getattr(job.Status, "PENDING", "PENDING")
in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incorrect fallback for in_progress_status.

The fallback value "PENDING" for in_progress_status is incorrect. If IN_PROGRESS doesn't exist on job.Status, the comparison on line 47 would use the wrong value, causing the condition to always fail when the job is actually in progress. This could silently prevent job status updates.

🛡️ Suggested fix - remove unnecessary fallbacks
-            pending_job_status = getattr(job.Status, "PENDING", "PENDING")
-            in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
+            pending_job_status = job.Status.PENDING
+            in_progress_status = job.Status.IN_PROGRESS

If the job model might not have these status choices, consider adding validation or early return instead of using misleading fallbacks.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pending_job_status = getattr(job.Status, "PENDING", "PENDING")
in_progress_status = getattr(job.Status, "IN_PROGRESS", "PENDING")
pending_job_status = job.Status.PENDING
in_progress_status = job.Status.IN_PROGRESS
🤖 Prompt for AI Agents
In `@adit_radis_shared/common/management/commands/retry_stalled_jobs.py` around
lines 44 - 45, The code sets in_progress_status with an incorrect fallback
("PENDING") which can mask missing enum values; update the retrieval of
job.Status values by removing misleading string fallbacks — use
getattr(job.Status, "PENDING", None) and getattr(job.Status, "IN_PROGRESS",
None) (or raise/early-return if either is None) and adjust the check that
compares against in_progress_status so it only runs when both pending_job_status
and in_progress_status are present; refer to the variables pending_job_status,
in_progress_status and the job.Status enum in the retry_stalled_jobs logic.

@Lucius1274 Lucius1274 closed this Mar 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant