Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from ami.base.models import BaseModel
from ami.base.schemas import ConfigurableStage, ConfigurableStageParam
from ami.jobs.tasks import run_job
from ami.jobs.tasks import cleanup_async_job_if_needed, run_job
from ami.main.models import Deployment, Project, SourceImage, SourceImageCollection
from ami.ml.models import Pipeline
from ami.ml.post_processing.registry import get_postprocessing_task
Expand Down Expand Up @@ -970,14 +970,16 @@ def cancel(self):
"""
self.status = JobState.CANCELING
self.save()

cleanup_async_job_if_needed(self)
if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
self.save()
else:
self.status = JobState.REVOKED
self.save()
# For sync jobs the task revoke will update the job status. However, for async jobs we need to set the status
# to revoked here since the task already finished (it only queues the images)
self.status = JobState.REVOKED
self.save()
Comment on lines +979 to +982
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unconditional status change to REVOKED on lines 981-982 could create a race condition with the task_postrun signal handler. For sync jobs with task_id, task.revoke(terminate=True) triggers the task_postrun signal which calls update_job_status and sets status to REVOKED. However, if the signal handler runs between line 978 and line 981, this code would still overwrite any status the signal handler set. Consider checking the dispatch mode or current status before unconditionally setting to REVOKED, or removing the duplicate status update for jobs with task_id.

Suggested change
# For sync jobs the task revoke will update the job status. However, for async jobs we need to set the status
# to revoked here since the task already finished (it only queues the images)
self.status = JobState.REVOKED
self.save()
# For sync jobs the task revoke will update the job status via the task_postrun signal.
# However, for async jobs we need to set the status to revoked here since the task already
# finished (it only queues the images).
if not self.task_id or self.dispatch_mode == JobDispatchMode.ASYNC_API:
self.status = JobState.REVOKED
self.save()

Copilot uses AI. Check for mistakes.

def update_status(self, status=None, save=True):
"""
Expand Down
17 changes: 12 additions & 5 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from config import celery_app

if TYPE_CHECKING:
from ami.jobs.models import JobState
from ami.jobs.models import Job, JobState

logger = logging.getLogger(__name__)
# Minimum success rate. Jobs with fewer than this fraction of images
Expand Down Expand Up @@ -94,6 +94,13 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
)
raise self.retry(countdown=5, max_retries=10)

if progress_info.unknown:
logger.warning(
f"Progress info is unknown for job {job_id} when processing results. Job may be cancelled."
f"Or this could be a transient Redis error and the NATS task will be retried."
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between sentences in the log message. The second f-string should start with a space: " Or this could be a transient Redis error..." to properly separate it from the previous sentence.

Suggested change
f"Or this could be a transient Redis error and the NATS task will be retried."
f" Or this could be a transient Redis error and the NATS task will be retried."

Copilot uses AI. Check for mistakes.
)
return
Comment on lines +97 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing space between implicitly concatenated f-strings.

Lines 99–100 use adjacent f-strings that Python concatenates without any separator. The result is "...cancelled.Or this could..." — missing a space after the period.

Proposed fix
     if progress_info.unknown:
         logger.warning(
             f"Progress info is unknown for job {job_id} when processing results. Job may be cancelled."
-            f"Or this could be a transient Redis error and the NATS task will be retried."
+            f" Or this could be a transient Redis error and the NATS task will be retried."
         )
         return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tasks.py` around lines 97 - 102, The logger.warning message inside
the progress_info.unknown branch concatenates two adjacent f-strings and drops
the space between sentences; update the warning in the block that checks
progress_info.unknown (where job_id is referenced) so the two sentences are
separated (e.g., add a trailing space to the first f-string, a leading space on
the second, or merge into one f-string) ensuring the logged text reads
"...cancelled. Or this could..." when calling logger.warning.


try:
complete_state = JobState.SUCCESS
if progress_info.total > 0 and (progress_info.failed / progress_info.total) > FAILURE_THRESHOLD:
Expand Down Expand Up @@ -272,10 +279,10 @@ def _update_job_progress(
# Clean up async resources for completed jobs that use NATS/Redis
if job.progress.is_complete():
job = Job.objects.get(pk=job_id) # Re-fetch outside transaction
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


def _cleanup_job_if_needed(job) -> None:
def cleanup_async_job_if_needed(job: "Job") -> None:
"""
Clean up async resources (NATS/Redis) if this job uses them.

Expand Down Expand Up @@ -330,7 +337,7 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs):

# Clean up async resources for revoked jobs
if state == JobState.REVOKED:
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


@task_failure.connect(sender=run_job, retry=False)
Expand All @@ -345,7 +352,7 @@ def update_job_failure(sender, task_id, exception, *args, **kwargs):
job.save()

# Clean up async resources for failed jobs
_cleanup_job_if_needed(job)
cleanup_async_job_if_needed(job)


def log_time(start: float = 0, msg: str | None = None) -> tuple[float, Callable]:
Expand Down
5 changes: 4 additions & 1 deletion ami/ml/orchestration/async_job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class JobStateProgress:
may need to be generalized.
"""

unknown: bool = False # True if progress cannot be determined (e.g. missing Redis keys)
remaining: int = 0 # source images not yet processed in this stage
total: int = 0 # total source images in the job
processed: int = 0 # source images completed (success + failed)
Expand Down Expand Up @@ -169,7 +170,8 @@ def _commit_update(
pending_images = cache.get(self._get_pending_key(stage))
total_images = cache.get(self._total_key)
if pending_images is None or total_images is None:
return None
# important that this is distinguishable from not getting the lock
return JobStateProgress(unknown=True)
remaining_images = [img_id for img_id in pending_images if img_id not in processed_image_ids]
assert len(pending_images) >= len(remaining_images)
cache.set(self._get_pending_key(stage), remaining_images, timeout=self.TIMEOUT)
Expand Down Expand Up @@ -210,3 +212,4 @@ def cleanup(self) -> None:
cache.delete(self._get_pending_key(stage))
cache.delete(self._failed_key)
cache.delete(self._total_key)
cache.delete(_lock_key(self.job_id))
27 changes: 20 additions & 7 deletions ami/ml/orchestration/nats_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,31 @@ def _get_consumer_name(self, job_id: int) -> str:
"""Get consumer name from job_id."""
return f"job-{job_id}-consumer"

async def _ensure_stream(self, job_id: int):
"""Ensure stream exists for the given job."""
async def _stream_exists(self, job_id: int) -> bool:
"""Check if stream exists for the given job."""
if self.js is None:
raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")

stream_name = self._get_stream_name(job_id)
subject = self._get_subject(job_id)

try:
await asyncio.wait_for(self.js.stream_info(stream_name), timeout=NATS_JETSTREAM_TIMEOUT)
logger.debug(f"Stream {stream_name} already exists")
return True
except asyncio.TimeoutError:
raise # NATS unreachable — let caller handle it rather than creating a stream blindly
except Exception as e:
logger.warning(f"Stream {stream_name} does not exist: {e}")
except Exception:
return False
Comment on lines +98 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check what exception nats.py raises for missing streams
cd /tmp && pip install nats-py >/dev/null 2>&1

# Try to find the exception types in nats-py
python3 << 'EOF'
try:
    # Try different import paths
    try:
        from nats.js.errors import NotFoundError
        print("Found: nats.js.errors.NotFoundError")
    except ImportError:
        pass
    
    try:
        from nats.errors import NotFoundError
        print("Found: nats.errors.NotFoundError")
    except ImportError:
        pass
    
    # List all exceptions in nats module
    import nats
    import inspect
    
    # Check nats.js module
    try:
        import nats.js
        print("\n=== nats.js exceptions ===")
        for name in dir(nats.js):
            obj = getattr(nats.js, name)
            if inspect.isclass(obj) and issubclass(obj, BaseException):
                print(f"  {name}")
    except Exception as e:
        print(f"Error inspecting nats.js: {e}")
    
    # Check for errors submodule
    try:
        import nats.js.errors as errors
        print("\n=== nats.js.errors exceptions ===")
        for name in dir(errors):
            if not name.startswith('_'):
                obj = getattr(errors, name)
                if inspect.isclass(obj) and issubclass(obj, BaseException):
                    print(f"  {name}")
    except Exception as e:
        print(f"Error inspecting nats.js.errors: {e}")
        
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()
EOF

Repository: RolnickLab/antenna

Length of output: 712


🏁 Script executed:

# Also check the actual file to see the context
cat -n ami/ml/orchestration/nats_queue.py | sed -n '95,135p'

Repository: RolnickLab/antenna

Length of output: 2088


🏁 Script executed:

# Check for other usages of stream_info or exception handling in the file
rg "stream_info|except" ami/ml/orchestration/nats_queue.py -n

Repository: RolnickLab/antenna

Length of output: 665


Narrow exception handling to avoid masking transient NATS errors.

The nats.js library provides nats.js.errors.NotFoundError for missing streams. The current broad except Exception catch treats all non-timeout errors as "stream doesn't exist", which could mask transient NATS failures like ServiceUnavailableError or ServerError. This could cause reserve_tasks to silently return an empty list or _ensure_stream to unnecessarily retry stream creation when the real issue is a temporary NATS connectivity problem.

Catch nats.js.errors.NotFoundError specifically and let other exceptions propagate to surface real NATS issues.

🧰 Tools
🪛 Ruff (0.15.2)

[warning] 101-101: Avoid specifying long messages outside the exception class

(TRY003)


[warning] 107-107: Consider moving this statement to an else block

(TRY300)


[warning] 110-110: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 98 - 111, The _stream_exists
method currently catches all exceptions and treats them as "stream missing",
which masks real NATS errors; update the error handling in _stream_exists (which
calls self.js.stream_info and uses _get_stream_name and NATS_JETSTREAM_TIMEOUT)
to specifically catch the NotFoundError from the nats client
(nats.js.errors.NotFoundError) and return False only in that case, keep the
existing asyncio.TimeoutError re-raise, and let any other exceptions propagate
(i.e., remove the broad except Exception and replace it with except
NotFoundError: return False).


async def _ensure_stream(self, job_id: int):
"""Ensure stream exists for the given job."""
if self.js is None:
raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")

if not await self._stream_exists(job_id):
stream_name = self._get_stream_name(job_id)
subject = self._get_subject(job_id)

logger.warning(f"Stream {stream_name} does not exist")
# Stream doesn't exist, create it
await asyncio.wait_for(
self.js.add_stream(
Expand Down Expand Up @@ -207,7 +217,10 @@ async def reserve_tasks(self, job_id: int, count: int, timeout: float = 5) -> li
raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")

try:
await self._ensure_stream(job_id)
if not await self._stream_exists(job_id):
logger.debug(f"Stream for job '{job_id}' does not exist when reserving task")
return []

await self._ensure_consumer(job_id)

consumer_name = self._get_consumer_name(job_id)
Expand Down
4 changes: 2 additions & 2 deletions ami/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ def test_small_size_filter_assigns_not_identifiable(self):
)


class TestTaskStateManager(TestCase):
class TestAsyncJobStateManager(TestCase):
"""Test TaskStateManager for job progress tracking."""
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring still refers to "TaskStateManager" but the class has been renamed to "TestAsyncJobStateManager". Update the docstring to match the new class name for consistency.

Suggested change
"""Test TaskStateManager for job progress tracking."""
"""Test AsyncJobStateManager for job progress tracking."""

Copilot uses AI. Check for mistakes.

def setUp(self):
Expand Down Expand Up @@ -1025,7 +1025,7 @@ def test_cleanup(self):

# Verify keys are gone
progress = self.manager._commit_update(set(), "process")
self.assertIsNone(progress)
self.assertTrue(progress is None or progress.unknown)

def test_failed_image_tracking(self):
"""Test basic failed image tracking with no double-counting on retries."""
Expand Down