Support cancelling ML async jobs#1144
Conversation
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughThis pull request refactors async job cleanup handling by making the cleanup function public with proper type annotations, enhancing progress tracking with an unknown state indicator, and improving stream existence checks in the NATS queue manager to prevent unnecessary stream creation during task reservation. Changes
Sequence DiagramsequenceDiagram
participant Client
participant JobModel as Job Model
participant TaskQueue as TaskQueue Manager
participant AsyncState as AsyncJobState
participant Redis
Client->>JobModel: cancel()
JobModel->>JobModel: set status to CANCELING
JobModel->>TaskQueue: cleanup_async_job_if_needed(self)
TaskQueue->>Redis: check task exists
alt Task Found
TaskQueue->>Redis: revoke task
else No Task
TaskQueue->>Redis: skip revocation
end
TaskQueue->>AsyncState: cleanup progress state
AsyncState->>Redis: delete stage keys
AsyncState->>Redis: delete lock key
JobModel->>JobModel: set status to REVOKED
JobModel->>JobModel: save
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…o Revoked when cancelling
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR enhances the cancellation mechanism for asynchronous ML jobs that use NATS/Redis for distributed processing. It improves resource cleanup, adds better handling for edge cases when progress tracking state is unavailable, and refactors the cleanup function for better clarity.
Changes:
- Introduced an
unknownflag inJobStateProgressto distinguish missing Redis keys from lock contention - Enhanced job cancellation to call cleanup for async resources and properly handle status transitions
- Refactored
_cleanup_job_if_neededtocleanup_async_job_if_neededwith explicit type signature
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| ami/ml/tests.py | Renamed test class to TestAsyncJobStateManager and updated cleanup test assertion to handle unknown progress state |
| ami/ml/orchestration/nats_queue.py | Refactored stream existence checking into separate _stream_exists method and added early return in reserve_tasks when stream doesn't exist |
| ami/ml/orchestration/async_job_state.py | Added unknown field to JobStateProgress, updated _commit_update to return unknown state for missing keys, added lock key cleanup |
| ami/jobs/tasks.py | Added handling for unknown progress state with early return, renamed cleanup function and added type hint |
| ami/jobs/models.py | Updated cancel method to call cleanup before revoking task and unconditionally set status to REVOKED |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # For sync jobs the task revoke will update the job status. However, for async jobs we need to set the status | ||
| # to revoked here since the task already finished (it only queues the images) | ||
| self.status = JobState.REVOKED | ||
| self.save() |
There was a problem hiding this comment.
The unconditional status change to REVOKED on lines 981-982 could create a race condition with the task_postrun signal handler. For sync jobs with task_id, task.revoke(terminate=True) triggers the task_postrun signal which calls update_job_status and sets status to REVOKED. However, if the signal handler runs between line 978 and line 981, this code would still overwrite any status the signal handler set. Consider checking the dispatch mode or current status before unconditionally setting to REVOKED, or removing the duplicate status update for jobs with task_id.
| # For sync jobs the task revoke will update the job status. However, for async jobs we need to set the status | |
| # to revoked here since the task already finished (it only queues the images) | |
| self.status = JobState.REVOKED | |
| self.save() | |
| # For sync jobs the task revoke will update the job status via the task_postrun signal. | |
| # However, for async jobs we need to set the status to revoked here since the task already | |
| # finished (it only queues the images). | |
| if not self.task_id or self.dispatch_mode == JobDispatchMode.ASYNC_API: | |
| self.status = JobState.REVOKED | |
| self.save() |
|
|
||
| class TestTaskStateManager(TestCase): | ||
| class TestAsyncJobStateManager(TestCase): | ||
| """Test TaskStateManager for job progress tracking.""" |
There was a problem hiding this comment.
The docstring still refers to "TaskStateManager" but the class has been renamed to "TestAsyncJobStateManager". Update the docstring to match the new class name for consistency.
| """Test TaskStateManager for job progress tracking.""" | |
| """Test AsyncJobStateManager for job progress tracking.""" |
| if progress_info.unknown: | ||
| logger.warning( | ||
| f"Progress info is unknown for job {job_id} when processing results. Job may be cancelled." | ||
| f"Or this could be a transient Redis error and the NATS task will be retried." |
There was a problem hiding this comment.
Missing space between sentences in the log message. The second f-string should start with a space: " Or this could be a transient Redis error..." to properly separate it from the previous sentence.
| f"Or this could be a transient Redis error and the NATS task will be retried." | |
| f" Or this could be a transient Redis error and the NATS task will be retried." |
|
|
||
| # update complete state based on latest progress info after saving results | ||
| complete_state = JobState.SUCCESS | ||
| if progress_info.total > 0 and (progress_info.failed / progress_info.total) > FAILURE_THRESHOLD: |
There was a problem hiding this comment.
After the second update_state call for the "results" stage, there is no check for progress_info.unknown like there is for the "process" stage (lines 97-102). If the job is cancelled between the two update_state calls, progress_info could have unknown=True, and line 172 would attempt to access progress_info.total on a JobStateProgress with default values (total=0). While this might not crash, it could lead to incorrect behavior. Consider adding a similar check for progress_info.unknown after line 168.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
ami/ml/orchestration/nats_queue.py (1)
118-122: Consider downgrading the log level toinfo.A stream not existing before creation is expected on the publish path (the first task for every new job).
logger.warningsuggests something abnormal, but this is normal first-publish behavior.logger.infowould be more appropriate.Proposed fix
- logger.warning(f"Stream {stream_name} does not exist") + logger.info(f"Stream {stream_name} does not exist, creating it")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 118 - 122, The log in the publish path uses logger.warning when a stream is absent, but that is expected behavior; change the call in the branch where _stream_exists(job_id) returns False to use logger.info instead of logger.warning (the block that computes stream_name via _get_stream_name(job_id) and subject via _get_subject(job_id)). Keep the same message text and context, only lowering the level to info so first-publish behavior is not signaled as a warning.ami/ml/orchestration/async_job_state.py (1)
159-174: Return type annotation is now slightly inaccurate.With the change at lines 173–174,
_commit_updatecan no longer returnNone— it always returns aJobStateProgress. The signature-> JobStateProgress | Noneis stale. This is cosmetic, but keeping annotations accurate helps tooling and future readers.Proposed fix
def _commit_update( self, processed_image_ids: set[str], stage: str, failed_image_ids: set[str] | None = None, - ) -> JobStateProgress | None: + ) -> JobStateProgress:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/async_job_state.py` around lines 159 - 174, The return type annotation of _commit_update is stale: it currently declares "-> JobStateProgress | None" but the implementation always returns a JobStateProgress (e.g., returns JobStateProgress(unknown=True) when pending_images or total_images is None). Update the function signature of _commit_update to "-> JobStateProgress" and ensure any associated docstring or type hints that reference a possible None are updated accordingly; locate the method named _commit_update and the JobStateProgress type to make this change.ami/jobs/models.py (1)
967-982: Cancel flow looks correct for both sync and async jobs.The ordering (CANCELING → cleanup → revoke → REVOKED) is sound. For async jobs, the Celery task is already done so cleanup runs first; for non-async jobs,
cleanup_async_job_if_neededis a no-op.One concern: if
cleanup_async_job_if_neededraises an unexpected exception, the job stays stuck inCANCELINGand never reachesREVOKED. Consider wrapping the cleanup call in a try/except to ensure the status always transitions to REVOKED.Proposed defensive wrapper
self.status = JobState.CANCELING self.save() - cleanup_async_job_if_needed(self) + try: + cleanup_async_job_if_needed(self) + except Exception: + logger.exception(f"Failed to clean up async resources for job {self.pk}, proceeding with cancellation") if self.task_id: task = run_job.AsyncResult(self.task_id) if task:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/models.py` around lines 967 - 982, The call to cleanup_async_job_if_needed(self) in cancel() can raise and leave the job stuck in CANCELING; wrap that call in a try/except Exception block so any exception is caught and logged (use the module logger or a suitable logging mechanism, e.g., logger.exception) and then continue with the revoke and final status update; ensure the code always sets self.status = JobState.REVOKED and calls self.save() in a finally-like path so REVOKED is persisted even if cleanup fails, while keeping the existing revoke logic using run_job.AsyncResult(self.task_id) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 97-102: The logger.warning message inside the
progress_info.unknown branch concatenates two adjacent f-strings and drops the
space between sentences; update the warning in the block that checks
progress_info.unknown (where job_id is referenced) so the two sentences are
separated (e.g., add a trailing space to the first f-string, a leading space on
the second, or merge into one f-string) ensuring the logged text reads
"...cancelled. Or this could..." when calling logger.warning.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 98-111: The _stream_exists method currently catches all exceptions
and treats them as "stream missing", which masks real NATS errors; update the
error handling in _stream_exists (which calls self.js.stream_info and uses
_get_stream_name and NATS_JETSTREAM_TIMEOUT) to specifically catch the
NotFoundError from the nats client (nats.js.errors.NotFoundError) and return
False only in that case, keep the existing asyncio.TimeoutError re-raise, and
let any other exceptions propagate (i.e., remove the broad except Exception and
replace it with except NotFoundError: return False).
---
Nitpick comments:
In `@ami/jobs/models.py`:
- Around line 967-982: The call to cleanup_async_job_if_needed(self) in cancel()
can raise and leave the job stuck in CANCELING; wrap that call in a try/except
Exception block so any exception is caught and logged (use the module logger or
a suitable logging mechanism, e.g., logger.exception) and then continue with the
revoke and final status update; ensure the code always sets self.status =
JobState.REVOKED and calls self.save() in a finally-like path so REVOKED is
persisted even if cleanup fails, while keeping the existing revoke logic using
run_job.AsyncResult(self.task_id) unchanged.
In `@ami/ml/orchestration/async_job_state.py`:
- Around line 159-174: The return type annotation of _commit_update is stale: it
currently declares "-> JobStateProgress | None" but the implementation always
returns a JobStateProgress (e.g., returns JobStateProgress(unknown=True) when
pending_images or total_images is None). Update the function signature of
_commit_update to "-> JobStateProgress" and ensure any associated docstring or
type hints that reference a possible None are updated accordingly; locate the
method named _commit_update and the JobStateProgress type to make this change.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 118-122: The log in the publish path uses logger.warning when a
stream is absent, but that is expected behavior; change the call in the branch
where _stream_exists(job_id) returns False to use logger.info instead of
logger.warning (the block that computes stream_name via _get_stream_name(job_id)
and subject via _get_subject(job_id)). Keep the same message text and context,
only lowering the level to info so first-publish behavior is not signaled as a
warning.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
ami/jobs/models.pyami/jobs/tasks.pyami/ml/orchestration/async_job_state.pyami/ml/orchestration/nats_queue.pyami/ml/tests.py
| if progress_info.unknown: | ||
| logger.warning( | ||
| f"Progress info is unknown for job {job_id} when processing results. Job may be cancelled." | ||
| f"Or this could be a transient Redis error and the NATS task will be retried." | ||
| ) | ||
| return |
There was a problem hiding this comment.
Missing space between implicitly concatenated f-strings.
Lines 99–100 use adjacent f-strings that Python concatenates without any separator. The result is "...cancelled.Or this could..." — missing a space after the period.
Proposed fix
if progress_info.unknown:
logger.warning(
f"Progress info is unknown for job {job_id} when processing results. Job may be cancelled."
- f"Or this could be a transient Redis error and the NATS task will be retried."
+ f" Or this could be a transient Redis error and the NATS task will be retried."
)
return🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/tasks.py` around lines 97 - 102, The logger.warning message inside
the progress_info.unknown branch concatenates two adjacent f-strings and drops
the space between sentences; update the warning in the block that checks
progress_info.unknown (where job_id is referenced) so the two sentences are
separated (e.g., add a trailing space to the first f-string, a leading space on
the second, or merge into one f-string) ensuring the logged text reads
"...cancelled. Or this could..." when calling logger.warning.
| async def _stream_exists(self, job_id: int) -> bool: | ||
| """Check if stream exists for the given job.""" | ||
| if self.js is None: | ||
| raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") | ||
|
|
||
| stream_name = self._get_stream_name(job_id) | ||
| subject = self._get_subject(job_id) | ||
|
|
||
| try: | ||
| await asyncio.wait_for(self.js.stream_info(stream_name), timeout=NATS_JETSTREAM_TIMEOUT) | ||
| logger.debug(f"Stream {stream_name} already exists") | ||
| return True | ||
| except asyncio.TimeoutError: | ||
| raise # NATS unreachable — let caller handle it rather than creating a stream blindly | ||
| except Exception as e: | ||
| logger.warning(f"Stream {stream_name} does not exist: {e}") | ||
| except Exception: | ||
| return False |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check what exception nats.py raises for missing streams
cd /tmp && pip install nats-py >/dev/null 2>&1
# Try to find the exception types in nats-py
python3 << 'EOF'
try:
# Try different import paths
try:
from nats.js.errors import NotFoundError
print("Found: nats.js.errors.NotFoundError")
except ImportError:
pass
try:
from nats.errors import NotFoundError
print("Found: nats.errors.NotFoundError")
except ImportError:
pass
# List all exceptions in nats module
import nats
import inspect
# Check nats.js module
try:
import nats.js
print("\n=== nats.js exceptions ===")
for name in dir(nats.js):
obj = getattr(nats.js, name)
if inspect.isclass(obj) and issubclass(obj, BaseException):
print(f" {name}")
except Exception as e:
print(f"Error inspecting nats.js: {e}")
# Check for errors submodule
try:
import nats.js.errors as errors
print("\n=== nats.js.errors exceptions ===")
for name in dir(errors):
if not name.startswith('_'):
obj = getattr(errors, name)
if inspect.isclass(obj) and issubclass(obj, BaseException):
print(f" {name}")
except Exception as e:
print(f"Error inspecting nats.js.errors: {e}")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
EOFRepository: RolnickLab/antenna
Length of output: 712
🏁 Script executed:
# Also check the actual file to see the context
cat -n ami/ml/orchestration/nats_queue.py | sed -n '95,135p'Repository: RolnickLab/antenna
Length of output: 2088
🏁 Script executed:
# Check for other usages of stream_info or exception handling in the file
rg "stream_info|except" ami/ml/orchestration/nats_queue.py -nRepository: RolnickLab/antenna
Length of output: 665
Narrow exception handling to avoid masking transient NATS errors.
The nats.js library provides nats.js.errors.NotFoundError for missing streams. The current broad except Exception catch treats all non-timeout errors as "stream doesn't exist", which could mask transient NATS failures like ServiceUnavailableError or ServerError. This could cause reserve_tasks to silently return an empty list or _ensure_stream to unnecessarily retry stream creation when the real issue is a temporary NATS connectivity problem.
Catch nats.js.errors.NotFoundError specifically and let other exceptions propagate to surface real NATS issues.
🧰 Tools
🪛 Ruff (0.15.2)
[warning] 101-101: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 107-107: Consider moving this statement to an else block
(TRY300)
[warning] 110-110: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/ml/orchestration/nats_queue.py` around lines 98 - 111, The _stream_exists
method currently catches all exceptions and treats them as "stream missing",
which masks real NATS errors; update the error handling in _stream_exists (which
calls self.js.stream_info and uses _get_stream_name and NATS_JETSTREAM_TIMEOUT)
to specifically catch the NotFoundError from the nats client
(nats.js.errors.NotFoundError) and return False only in that case, keep the
existing asyncio.TimeoutError re-raise, and let any other exceptions propagate
(i.e., remove the broad except Exception and replace it with except
NotFoundError: return False).
Summary
This pull request introduces improvements to job cancellation and cleanup logic, particularly for asynchronous jobs using NATS/Redis. It adds better handling for unknown progress states, ensures cleanup routines are consistently invoked, and improves code clarity by renaming and refactoring functions.
Job cancellation and cleanup improvements:
The
cancelmethod in theJobmodel now callscleanup_async_job_if_neededto ensure async resources are cleaned up when a job is cancelled.The cleanup function
_cleanup_job_if_neededhas been renamed tocleanup_async_job_if_neededand its type signature clarified; all references throughout the codebase have been updated to use the new name.Handling unknown progress states:
The
JobStateProgressclass now includes anunknownflag to indicate when progress cannot be determined, such as missing Redis keys.When progress information is unknown during NATS pipeline result processing, a warning is logged and the task returns early, preventing further processing and retrying as needed.
The
_commit_updatemethod returns aJobStateProgresswithunknown=Truewhen Redis keys are missing, making this state distinguishable from other errors.Resource cleanup enhancements:
cleanupmethod for async job state now deletes the lock key in addition to other cache keys, ensuring all Redis resources are properly released.Testing
Checklist
test_cancel_job()is currently just a stubSummary by CodeRabbit
Release Notes
Bug Fixes
Refactor