Skip to content

Commit 045a51a

Browse files
committed
Wrap tasks in app executor
1 parent b674ce5 commit 045a51a

16 files changed

Lines changed: 142 additions & 24 deletions

Makefile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
.PHONY: test
12
test:
3+
tests/bin/run-docker-postgres
24
uv run python runtests.py
35

4-
test.postgres:
6+
.PHONY: steady_queue
7+
steady_queue:
58
tests/bin/run-docker-postgres
6-
DB_URL=postgres://steady_queue:steady_queue@localhost:5432/steady_queue uv run python runtests.py
9+
uv run python manage.py steady_queue
710

11+
.PHONY: lint
812
lint:
913
uv run pre-commit run --all-files

steady_queue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import timedelta
22
from typing import Optional
33

4-
VERSION = (0, 1, "0-beta2")
4+
VERSION = (0, 1, "0b3")
55

66
__version__ = ".".join(map(str, VERSION))
77

steady_queue/app_executor.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import logging
2+
import threading
3+
from contextlib import contextmanager
4+
5+
from django.db import connections
6+
7+
logger = logging.getLogger("steady_queue")
8+
9+
10+
class AppExecutor:
11+
@staticmethod
12+
@contextmanager
13+
def wrap_in_app_executor():
14+
"""
15+
Django equivalent of Rails' wrap_in_app_executor.
16+
Ensures proper database connection handling in background threads.
17+
"""
18+
# Ensure we have a database connection for this thread
19+
connection = connections["default"] # or your specific database alias
20+
21+
try:
22+
# Ensure connection is established
23+
connection.ensure_connection()
24+
25+
# Clear any existing queries from connection (equivalent to Rails query cache reset)
26+
# if hasattr(connection, "queries_logged"):
27+
# connection.queries_logged = 0
28+
# connection.queries = []
29+
30+
yield
31+
32+
except Exception as e:
33+
# Handle any database-related errors
34+
# Close connection on error to prevent connection leaks
35+
logger.exception("Error in AppExecutor: %s", e)
36+
if connection.connection:
37+
connection.close()
38+
raise
39+
finally:
40+
# Django automatically manages connection lifecycle,
41+
# but we can explicitly close if needed for long-running processes
42+
if threading.current_thread() != threading.main_thread():
43+
# Only close connections in background threads
44+
# Main thread connections are handled by Django's request/response cycle
45+
connection.close()

steady_queue/processes/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import socket
44
from typing import Any
55

6+
from django.db import connections
7+
68

79
class Base:
810
name: str
@@ -43,3 +45,6 @@ def is_stopped(self) -> bool:
4345

4446
def generate_name(self) -> str:
4547
return "-".join((self.kind, secrets.token_hex(10)))
48+
49+
def reset_database_connections(self):
50+
connections.close_all()

steady_queue/processes/maintenance.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22

33
import steady_queue
4+
from steady_queue.app_executor import AppExecutor
45
from steady_queue.models.claimed_execution import ClaimedExecution
56
from steady_queue.models.process import Process
67
from steady_queue.processes.errors import ProcessMissingError
@@ -23,8 +24,10 @@ def stop_maintenance_task(self):
2324
self.maintenance_task.stop()
2425

2526
def fail_orphaned_executions(self):
26-
ClaimedExecution.objects.orphaned().fail_all_with(ProcessMissingError())
27+
with AppExecutor.wrap_in_app_executor():
28+
ClaimedExecution.objects.orphaned().fail_all_with(ProcessMissingError())
2729

2830
def prune_dead_processes(self):
2931
logger.debug("pruning dead processes")
30-
Process.objects.exclude(pk=self.process.pk).prune()
32+
with AppExecutor.wrap_in_app_executor():
33+
Process.objects.exclude(pk=self.process.pk).prune()

steady_queue/processes/poller.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from datetime import timedelta
33
from typing import Any
44

5+
from steady_queue.app_executor import AppExecutor
56
from steady_queue.processes.base import Base
67
from steady_queue.processes.interruptible import Interruptible
78
from steady_queue.processes.registrable import Registrable
@@ -38,7 +39,9 @@ def start_loop(self):
3839
if self.is_shutting_down:
3940
break
4041

41-
delay = self.poll()
42+
with AppExecutor.wrap_in_app_executor():
43+
delay = self.poll()
44+
4245
self.interruptible_sleep(delay)
4346
finally:
4447
self.shutdown()

steady_queue/processes/pool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from threading import Lock
44
from typing import Callable
55

6+
from steady_queue.app_executor import AppExecutor
67
from steady_queue.models.claimed_execution import ClaimedExecution
78
from steady_queue.processes.concurrent import AtomicInteger
89

@@ -24,7 +25,8 @@ def post(self, execution: ClaimedExecution):
2425

2526
def wrapped_execution():
2627
try:
27-
execution.perform()
28+
with AppExecutor.wrap_in_app_executor():
29+
execution.perform()
2830
finally:
2931
self.available_threads.increment()
3032
with self.mutex:

steady_queue/processes/registrable.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Optional
33

44
import steady_queue
5+
from steady_queue.app_executor import AppExecutor
56
from steady_queue.models import Process
67
from steady_queue.processes.base import Base
78
from steady_queue.processes.timer import TimerTask
@@ -67,12 +68,13 @@ def stop_heartbeat(self):
6768
logger.debug("stopped heartbeat for %s", self.name)
6869

6970
def heartbeat(self):
70-
try:
71-
logger.debug("heartbeat from %s", self.name)
72-
self.process.heartbeat()
73-
except Process.DoesNotExist:
74-
self.process = None
75-
self.wake_up()
71+
with AppExecutor.wrap_in_app_executor():
72+
try:
73+
logger.debug("heartbeat from %s", self.name)
74+
self.process.heartbeat()
75+
except Process.DoesNotExist:
76+
self.process = None
77+
self.wake_up()
7678

7779
@property
7880
def process_id(self):

steady_queue/processes/runnable.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import os
33

4-
from django.db import connections
54
from steady_queue.processes.supervised import Supervised
65

76
logger = logging.getLogger("steady_queue")
@@ -57,9 +56,6 @@ def is_all_work_completed(self) -> bool:
5756
def set_procline(self):
5857
pass
5958

60-
def reset_database_connections(self):
61-
connections.close_all()
62-
6359
@property
6460
def is_running_inline(self) -> bool:
6561
return self.mode == "inline"

steady_queue/processes/signals.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def handle_signal(self, sig):
4141
if sig in (signal.SIGTERM, signal.SIGINT):
4242
self.stop()
4343
self.terminate_gracefully()
44-
elif sig == self.SIGQUIT:
44+
elif sig == signal.SIGQUIT:
4545
self.stop()
4646
self.terminate_immediately()
4747
else:

0 commit comments

Comments
 (0)