diff --git a/landoapi/models/landing_job.py b/landoapi/models/landing_job.py index 91708e91..f9e5ae24 100644 --- a/landoapi/models/landing_job.py +++ b/landoapi/models/landing_job.py @@ -56,6 +56,15 @@ class LandingJobStatus(enum.Enum): # Manually cancelled state. CANCELLED = "CANCELLED" + @classmethod + @property + def ACTIVE_STATUSES(cls): + return ( + cls.SUBMITTED, + cls.DEFERRED, + cls.IN_PROGRESS, + ) + @enum.unique class LandingJobAction(enum.Enum): @@ -233,12 +242,7 @@ def job_queue_query( grace_seconds (int): Ignore landing jobs that were submitted after this many seconds ago. """ - applicable_statuses = ( - LandingJobStatus.SUBMITTED, - LandingJobStatus.IN_PROGRESS, - LandingJobStatus.DEFERRED, - ) - q = cls.query.filter(cls.status.in_(applicable_statuses)) + q = cls.query.filter(cls.status.in_(LandingJobStatus.ACTIVE_STATUSES)) if repositories: q = q.filter(cls.repository_name.in_(repositories)) diff --git a/landoapi/workers/base.py b/landoapi/workers/base.py index ce9b75b9..4b1db006 100644 --- a/landoapi/workers/base.py +++ b/landoapi/workers/base.py @@ -152,7 +152,6 @@ def refresh_enabled_repos(self): for r in self.applicable_repos if treestatus_subsystem.client.is_open(repo_clone_subsystem.repos[r].tree) ] - logger.info(f"{len(self.enabled_repos)} enabled repos: {self.enabled_repos}") def start(self, max_loops: int | None = None): """Run setup sequence and start the event loop.""" diff --git a/landoapi/workers/landing_worker.py b/landoapi/workers/landing_worker.py index 67ef99e4..eac60554 100644 --- a/landoapi/workers/landing_worker.py +++ b/landoapi/workers/landing_worker.py @@ -27,7 +27,11 @@ TreeClosed, ) from landoapi.models.configuration import ConfigurationKey -from landoapi.models.landing_job import LandingJob, LandingJobAction, LandingJobStatus +from landoapi.models.landing_job import ( + LandingJob, + LandingJobAction, + LandingJobStatus, +) from landoapi.notifications import ( notify_user_of_bug_update_failure, notify_user_of_landing_failure, @@ -71,6 +75,9 @@ def job_processing(worker: LandingWorker, job: LandingJob, db: SQLAlchemy): class LandingWorker(Worker): + TOO_MANY_ATTEMPTS_THRESHOLD = 10 + QUEUE_SIZE_THRESHOLD = 20 + @property def STOP_KEY(self) -> ConfigurationKey: """Return the configuration key that prevents the worker from starting.""" @@ -86,11 +93,32 @@ def __init__(self, *args, **kwargs): self.last_job_finished = None self.refresh_enabled_repos() - def loop(self): - logger.debug( - f"{len(self.applicable_repos)} applicable repos: {self.applicable_repos}" + def check_landing_worker_warnings(self): + """Log messages that show various important statistics about the landing worker.""" + + queue_size = LandingJob.query.filter( + LandingJob.status.in_(LandingJobStatus.ACTIVE_STATUSES) + ).count() + if queue_size >= self.QUEUE_SIZE_THRESHOLD: + logger.warning( + f"The landing queue size of {queue_size} exceeds threshold of " + f"{self.QUEUE_SIZE_THRESHOLD}." + ) + + runaway_jobs = LandingJob.query.filter( + LandingJob.status.in_(LandingJobStatus.ACTIVE_STATUSES), + LandingJob.attempts >= self.TOO_MANY_ATTEMPTS_THRESHOLD, ) + if runaway_jobs.count() > 0: + job = runaway_jobs.all()[0] + logger.warning( + f"Active landing job ({job}) has too many attempts ({job.attempts})" + ) + + def loop(self): + self.check_landing_worker_warnings() + # Check if any closed trees reopened since the beginning of this iteration if len(self.enabled_repos) != len(self.applicable_repos): self.refresh_enabled_repos()