Skip to content

Commit ca2e0cc

Browse files
authored
Simple heartbeat system for the task queue (#374)
* Add missing __init__.py files * Add a heartbeat system for django_q
1 parent da48bad commit ca2e0cc

File tree

25 files changed

+156
-8
lines changed

25 files changed

+156
-8
lines changed

backend/donations/management/__init__.py

Whitespace-only changes.

backend/donations/management/commands/__init__.py

Whitespace-only changes.

backend/partners/management/__init__.py

Whitespace-only changes.

backend/partners/management/commands/__init__.py

Whitespace-only changes.

backend/q_heartbeat/__init__.py

Whitespace-only changes.

backend/q_heartbeat/admin.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
# Register your models here.

backend/q_heartbeat/apps.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from django.apps import AppConfig
2+
3+
4+
class QHeartbeatConfig(AppConfig):
5+
default_auto_field = "django.db.models.BigAutoField"
6+
name = "q_heartbeat"

backend/q_heartbeat/management/__init__.py

Whitespace-only changes.

backend/q_heartbeat/management/commands/__init__.py

Whitespace-only changes.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import logging
2+
3+
import psutil
4+
from django_q.models import Schedule, Success
5+
from django.core.management.base import BaseCommand
6+
from django.utils import timezone
7+
8+
9+
logger = logging.getLogger(__name__)
10+
11+
SCHEDULE_NAME = "QHEARTBEAT"
12+
13+
14+
class Command(BaseCommand):
15+
help = "Without args, this command is a no-op just for registering that the task queue is alive"
16+
17+
def add_arguments(self, parser):
18+
parser.add_argument(
19+
"--hard",
20+
action="store_true",
21+
default=False,
22+
help="Kill the worker processes instead of terminating them",
23+
)
24+
parser.add_argument(
25+
"--check-minutes", # this will be stored in options["check_minutes"]
26+
type=int,
27+
default=0,
28+
help="Check for at least one successful task in the past N minutes",
29+
)
30+
31+
def handle(self, *args, **options):
32+
check_minutes = options.get("check_minutes", 0)
33+
34+
# Do we have to check for past heartbeats, or just mark a new one?
35+
if not check_minutes:
36+
logger.info("Just a task queue heartbeat")
37+
return
38+
39+
# If there is no heartbeat schedule set up, then there is nothing to check
40+
if not Schedule.objects.filter(name=SCHEDULE_NAME).exists():
41+
logger.info("Did not check for queue heartbeat because there is no heartbeat schedule")
42+
return
43+
44+
# Check if there were any successful tasks started in the past N minutes
45+
cutoff = timezone.now() - timezone.timedelta(minutes=check_minutes)
46+
if not Success.objects.filter(started__gte=cutoff).exists():
47+
# If there are no successful tasks, then try to terminate the workers
48+
logger.error("The task queue seems to be stuck, attempting to terminate it")
49+
self.terminate_workers(options.get("hard", False))
50+
else:
51+
logger.info("The task queue seems to be working")
52+
53+
def terminate_workers(self, hard_attempt: False) -> None:
54+
"""
55+
Terminate or kill all cluster workers
56+
"""
57+
needle_name = "python3"
58+
needle_cmd = [needle_name, "manage.py", "qcluster"]
59+
worker_count = 0
60+
for proc in psutil.process_iter():
61+
if proc.name() == needle_name and proc.cmdline() == needle_cmd:
62+
worker_count += 1
63+
proc.kill() if hard_attempt else proc.terminate()
64+
logging.warning("%s %d qcluster workers", "Killed" if hard_attempt else "Terminated", worker_count)

0 commit comments

Comments
 (0)