|
22 | 22 | ) |
23 | 23 |
|
24 | 24 | # Detect if running as a worker process (via run_huey command) |
25 | | -# Workers always need Huey configured regardless of DETACHED_WORKER setting |
26 | 25 | IS_WORKER_PROCESS = any("run_huey" in arg for arg in sys.argv) |
27 | 26 |
|
28 | | -# Skip Huey configuration only if: |
29 | | -# 1. Running in detached worker mode (DETACHED_WORKER=True) |
30 | | -# 2. AND not running as a worker process (IS_WORKER_PROCESS=False) |
31 | | -# This ensures workers always get Huey configured, but API servers don't when detached |
32 | | -if DETACHED_WORKER and not IS_WORKER_PROCESS: |
33 | | - # API server in detached mode - skip Huey configuration |
34 | | - HUEY = None |
35 | | -else: |
36 | | - # Either: worker process, or API server with embedded workers |
37 | | - # Redis configuration - REDIS_URL takes precedence and supersedes granular env vars |
38 | | - REDIS_URL = decouple.config("REDIS_URL", default=None) |
39 | | - |
40 | | - # Parse REDIS_URL or construct from individual parameters |
41 | | - if REDIS_URL is not None: |
42 | | - from urllib.parse import urlparse |
43 | | - |
44 | | - parsed = urlparse(REDIS_URL) |
45 | | - |
46 | | - # Extract values from REDIS_URL (these supersede granular env vars) |
47 | | - REDIS_HOST = parsed.hostname |
48 | | - REDIS_PORT = parsed.port or 6379 |
49 | | - REDIS_USERNAME = parsed.username or "default" |
50 | | - REDIS_PASSWORD = parsed.password |
51 | | - |
52 | | - # Determine SSL from URL scheme (rediss:// means SSL is enabled) |
53 | | - REDIS_SCHEME = ( |
54 | | - parsed.scheme if parsed.scheme in ("redis", "rediss") else "redis" |
55 | | - ) |
56 | | - REDIS_SSL = REDIS_SCHEME == "rediss" |
57 | | - |
58 | | - else: |
59 | | - # Fall back to individual parameters |
60 | | - REDIS_HOST = decouple.config("REDIS_HOST", default=None) |
61 | | - REDIS_PORT = decouple.config("REDIS_PORT", default=None) |
62 | | - REDIS_PASSWORD = decouple.config("REDIS_PASSWORD", default=None) |
63 | | - REDIS_USERNAME = decouple.config("REDIS_USERNAME", default="default") |
64 | | - REDIS_SSL = decouple.config("REDIS_SSL", default=False, cast=bool) |
65 | | - |
66 | | - # Configure HUEY based on available Redis configuration |
67 | | - if REDIS_HOST is not None: |
68 | | - # Calculate max connections based on environment |
69 | | - # Each worker replica needs: (workers_per_replica + 1 scheduler) connections |
70 | | - # Formula: (worker_replicas * (threads_per_worker + 1)) + api_connections + buffer |
71 | | - # Example: 100 connections = (5 replicas * (8 workers + 1 scheduler)) + 40 API + 15 buffer |
72 | | - REDIS_MAX_CONNECTIONS = decouple.config( |
73 | | - "REDIS_MAX_CONNECTIONS", default=100, cast=int |
74 | | - ) |
75 | | - |
76 | | - # Connection pool configuration with timeouts |
77 | | - # Use BlockingConnectionPool to wait for available connections instead of failing immediately |
78 | | - pool_kwargs = { |
79 | | - "host": REDIS_HOST, |
80 | | - "port": REDIS_PORT, |
81 | | - "max_connections": REDIS_MAX_CONNECTIONS, |
82 | | - "timeout": 20, # Wait up to 20 seconds for an available connection |
83 | | - # Timeout settings to prevent hung connections |
84 | | - "socket_timeout": 10, # Command execution timeout (seconds) |
85 | | - "socket_connect_timeout": 10, # Connection establishment timeout (seconds) |
86 | | - # Keep connections alive to prevent closure by firewalls/load balancers |
87 | | - "socket_keepalive": True, |
88 | | - # Retry on transient failures |
89 | | - "retry_on_timeout": True, |
90 | | - } |
| 27 | +# Always configure Huey for both API servers and workers |
| 28 | +# API servers need to enqueue tasks even when DETACHED_WORKER=True |
| 29 | +# Only the worker pods will consume tasks |
| 30 | +# Redis configuration - REDIS_URL takes precedence and supersedes granular env vars |
| 31 | +REDIS_URL = decouple.config("REDIS_URL", default=None) |
| 32 | + |
| 33 | +# Parse REDIS_URL or construct from individual parameters |
| 34 | +if REDIS_URL is not None: |
| 35 | + from urllib.parse import urlparse |
| 36 | + |
| 37 | + parsed = urlparse(REDIS_URL) |
91 | 38 |
|
92 | | - # Add TCP keepalive options if available (Linux/Unix only) |
93 | | - try: |
94 | | - pool_kwargs["socket_keepalive_options"] = { |
95 | | - socket.TCP_KEEPIDLE: 60, # Start keepalive after 60s idle |
96 | | - socket.TCP_KEEPINTVL: 10, # Keepalive interval |
97 | | - socket.TCP_KEEPCNT: 3, # Keepalive probes before timeout |
98 | | - } |
99 | | - except AttributeError: |
100 | | - # TCP keepalive constants not available on this platform |
101 | | - pass |
102 | | - |
103 | | - # Add authentication if provided |
104 | | - if REDIS_PASSWORD: |
105 | | - pool_kwargs["password"] = REDIS_PASSWORD |
106 | | - if REDIS_USERNAME: |
107 | | - pool_kwargs["username"] = REDIS_USERNAME |
108 | | - |
109 | | - # Add SSL/TLS configuration if enabled |
110 | | - if REDIS_SSL: |
111 | | - # Use SSLConnection class for SSL/TLS connections |
112 | | - pool_kwargs["connection_class"] = redis.SSLConnection |
113 | | - pool_kwargs["ssl_cert_reqs"] = None # For Azure Redis compatibility |
114 | | - |
115 | | - # Use BlockingConnectionPool to wait for connections instead of raising errors immediately |
116 | | - pool = redis.BlockingConnectionPool(**pool_kwargs) |
117 | | - |
118 | | - HUEY = huey.RedisHuey( |
119 | | - "default", |
120 | | - connection_pool=pool, |
121 | | - **({"immediate": WORKER_IMMEDIATE_MODE} if WORKER_IMMEDIATE_MODE else {}), |
122 | | - ) |
123 | | - |
124 | | - else: |
125 | | - # No Redis configured, use SQLite |
126 | | - WORKER_DB_DIR = decouple.config("WORKER_DB_DIR", default=settings.WORK_DIR) |
127 | | - WORKER_DB_FILE_NAME = os.path.join(WORKER_DB_DIR, "tasks.sqlite3") |
128 | | - |
129 | | - settings.DATABASES["workers"] = { |
130 | | - "NAME": WORKER_DB_FILE_NAME, |
131 | | - "ENGINE": "django.db.backends.sqlite3", |
| 39 | + # Extract values from REDIS_URL (these supersede granular env vars) |
| 40 | + REDIS_HOST = parsed.hostname |
| 41 | + REDIS_PORT = parsed.port or 6379 |
| 42 | + REDIS_USERNAME = parsed.username or "default" |
| 43 | + REDIS_PASSWORD = parsed.password |
| 44 | + |
| 45 | + # Determine SSL from URL scheme (rediss:// means SSL is enabled) |
| 46 | + REDIS_SCHEME = ( |
| 47 | + parsed.scheme if parsed.scheme in ("redis", "rediss") else "redis" |
| 48 | + ) |
| 49 | + REDIS_SSL = REDIS_SCHEME == "rediss" |
| 50 | + |
| 51 | +else: |
| 52 | + # Fall back to individual parameters |
| 53 | + REDIS_HOST = decouple.config("REDIS_HOST", default=None) |
| 54 | + REDIS_PORT = decouple.config("REDIS_PORT", default=None) |
| 55 | + REDIS_PASSWORD = decouple.config("REDIS_PASSWORD", default=None) |
| 56 | + REDIS_USERNAME = decouple.config("REDIS_USERNAME", default="default") |
| 57 | + REDIS_SSL = decouple.config("REDIS_SSL", default=False, cast=bool) |
| 58 | + |
| 59 | +# Configure HUEY based on available Redis configuration |
| 60 | +if REDIS_HOST is not None: |
| 61 | + # Calculate max connections based on environment |
| 62 | + # Each worker replica needs: (workers_per_replica + 1 scheduler) connections |
| 63 | + # Formula: (worker_replicas * (threads_per_worker + 1)) + api_connections + buffer |
| 64 | + # Example: 100 connections = (5 replicas * (8 workers + 1 scheduler)) + 40 API + 15 buffer |
| 65 | + REDIS_MAX_CONNECTIONS = decouple.config( |
| 66 | + "REDIS_MAX_CONNECTIONS", default=100, cast=int |
| 67 | + ) |
| 68 | + |
| 69 | + # Connection pool configuration with timeouts |
| 70 | + # Use BlockingConnectionPool to wait for available connections instead of failing immediately |
| 71 | + pool_kwargs = { |
| 72 | + "host": REDIS_HOST, |
| 73 | + "port": REDIS_PORT, |
| 74 | + "max_connections": REDIS_MAX_CONNECTIONS, |
| 75 | + "timeout": 20, # Wait up to 20 seconds for an available connection |
| 76 | + # Timeout settings to prevent hung connections |
| 77 | + "socket_timeout": 10, # Command execution timeout (seconds) |
| 78 | + "socket_connect_timeout": 10, # Connection establishment timeout (seconds) |
| 79 | + # Keep connections alive to prevent closure by firewalls/load balancers |
| 80 | + "socket_keepalive": True, |
| 81 | + # Retry on transient failures |
| 82 | + "retry_on_timeout": True, |
| 83 | + } |
| 84 | + |
| 85 | + # Add TCP keepalive options if available (Linux/Unix only) |
| 86 | + try: |
| 87 | + pool_kwargs["socket_keepalive_options"] = { |
| 88 | + socket.TCP_KEEPIDLE: 60, # Start keepalive after 60s idle |
| 89 | + socket.TCP_KEEPINTVL: 10, # Keepalive interval |
| 90 | + socket.TCP_KEEPCNT: 3, # Keepalive probes before timeout |
132 | 91 | } |
| 92 | + except AttributeError: |
| 93 | + # TCP keepalive constants not available on this platform |
| 94 | + pass |
| 95 | + |
| 96 | + # Add authentication if provided |
| 97 | + if REDIS_PASSWORD: |
| 98 | + pool_kwargs["password"] = REDIS_PASSWORD |
| 99 | + if REDIS_USERNAME: |
| 100 | + pool_kwargs["username"] = REDIS_USERNAME |
| 101 | + |
| 102 | + # Add SSL/TLS configuration if enabled |
| 103 | + if REDIS_SSL: |
| 104 | + # Use SSLConnection class for SSL/TLS connections |
| 105 | + pool_kwargs["connection_class"] = redis.SSLConnection |
| 106 | + pool_kwargs["ssl_cert_reqs"] = None # For Azure Redis compatibility |
| 107 | + |
| 108 | + # Use BlockingConnectionPool to wait for connections instead of raising errors immediately |
| 109 | + pool = redis.BlockingConnectionPool(**pool_kwargs) |
| 110 | + |
| 111 | + HUEY = huey.RedisHuey( |
| 112 | + "default", |
| 113 | + connection_pool=pool, |
| 114 | + **({"immediate": WORKER_IMMEDIATE_MODE} if WORKER_IMMEDIATE_MODE else {}), |
| 115 | + ) |
133 | 116 |
|
134 | | - HUEY = huey.SqliteHuey( |
135 | | - name="default", |
136 | | - filename=WORKER_DB_FILE_NAME, |
137 | | - **({"immediate": WORKER_IMMEDIATE_MODE} if WORKER_IMMEDIATE_MODE else {}), |
138 | | - ) |
| 117 | +else: |
| 118 | + # No Redis configured, use SQLite |
| 119 | + WORKER_DB_DIR = decouple.config("WORKER_DB_DIR", default=settings.WORK_DIR) |
| 120 | + WORKER_DB_FILE_NAME = os.path.join(WORKER_DB_DIR, "tasks.sqlite3") |
| 121 | + |
| 122 | + settings.DATABASES["workers"] = { |
| 123 | + "NAME": WORKER_DB_FILE_NAME, |
| 124 | + "ENGINE": "django.db.backends.sqlite3", |
| 125 | + } |
| 126 | + |
| 127 | + # SQLite-specific Huey configuration |
| 128 | + # WAL mode (Write-Ahead Logging) enables better concurrency for multiple workers |
| 129 | + # Increased timeout helps handle lock contention under concurrent access |
| 130 | + HUEY = huey.SqliteHuey( |
| 131 | + name="default", |
| 132 | + filename=WORKER_DB_FILE_NAME, |
| 133 | + # Storage-specific kwargs for better concurrent access handling |
| 134 | + journal_mode="wal", # Enable Write-Ahead Logging for better concurrency |
| 135 | + timeout=30, # Increase timeout to 30s to handle lock contention with multiple workers |
| 136 | + cache_mb=16, # Increase cache size for better performance (default: 8MB) |
| 137 | + fsync=False, # Disable forced fsync for better performance (default: False) |
| 138 | + **({"immediate": WORKER_IMMEDIATE_MODE} if WORKER_IMMEDIATE_MODE else {}), |
| 139 | + ) |
0 commit comments