Skip to content

Commit 42002f9

Browse files
committed
disable connection pooling in workers :/
1 parent 36386c0 commit 42002f9

5 files changed

Lines changed: 75 additions & 2 deletions

File tree

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,7 @@ steady_queue:
1111
.PHONY: lint
1212
lint:
1313
uv run pre-commit run --all-files
14+
15+
.PHONY: force-kill
16+
force-kill:
17+
ps | grep steady_queue | cut -f 1 -d ' ' | xargs kill -9 && rm -f tmp/pids/steady_queue_supervisor.pid

steady_queue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import timedelta
22
from typing import Optional
33

4-
VERSION = (0, 1, "0b3")
4+
VERSION = (0, 1, "0b4")
55

66
__version__ = ".".join(map(str, VERSION))
77

steady_queue/processes/base.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,61 @@ def is_stopped(self) -> bool:
4646
def generate_name(self) -> str:
4747
return "-".join((self.kind, secrets.token_hex(10)))
4848

49+
def disable_connection_pooling(self):
50+
"""
51+
Disable connection pooling for steady_queue processes.
52+
53+
Connection pooling with psycopg doesn't work with forked processes.
54+
This method removes pool configuration from database settings to prevent
55+
pool-related errors in steady_queue workers.
56+
"""
57+
import logging
58+
59+
from django.conf import settings
60+
61+
logger = logging.getLogger("steady_queue")
62+
63+
# Disable pooling in database configuration
64+
if hasattr(settings, "DATABASES"):
65+
for alias, db_config in settings.DATABASES.items():
66+
if db_config.get("ENGINE") == "django.db.backends.postgresql":
67+
# Remove pool configuration if it exists
68+
options = db_config.setdefault("OPTIONS", {})
69+
if "pool" in options:
70+
logger.info(
71+
"PID %d disabling connection pooling for database '%s'",
72+
os.getpid(),
73+
alias,
74+
)
75+
del options["pool"]
76+
77+
# Also disable on any existing connections
78+
for alias in connections:
79+
connection = connections[alias]
80+
if hasattr(connection, "pool") and connection.pool is not None:
81+
try:
82+
connection.pool.close()
83+
connection.pool = None
84+
logger.debug(
85+
"PID %d removed existing pool for '%s'", os.getpid(), alias
86+
)
87+
except Exception as e:
88+
logger.debug(
89+
"PID %d failed to close existing pool for '%s': %s",
90+
os.getpid(),
91+
alias,
92+
e,
93+
)
94+
4995
def reset_database_connections(self):
96+
"""
97+
Reset database connections for forked processes.
98+
99+
This disables connection pooling and resets connection state to prevent
100+
issues with shared connections between parent and child processes.
101+
"""
102+
# First disable connection pooling for steady_queue processes
103+
self.disable_connection_pooling()
104+
105+
# Close all existing connections
50106
connections.close_all()

tests/dummy/tasks.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1+
import time
2+
13
from django_tasks import task
24

3-
from steady_queue import recurring
5+
from steady_queue.recurring_task import recurring
46

57

68
@task()
79
def dummy_task():
810
print("dummy task")
911

1012

13+
@task()
14+
def long_running_task():
15+
print("long running task")
16+
time.sleep(10)
17+
print("long running task finished")
18+
19+
1120
@recurring(schedule="*/1 * * * *", key="dummy_recurring_task")
1221
@task()
1322
def dummy_recurring_task():

tests/settings.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
),
4040
}
4141

42+
if DATABASES["default"]["ENGINE"] == "django.db.backends.postgresql":
43+
print("Enabling connection pooling")
44+
DATABASES["default"]["OPTIONS"] = {"pool": {"min_size": 2, "max_size": 4}}
45+
4246

4347
# Tasks
4448
TASKS = {

0 commit comments

Comments
 (0)