1+ import asyncio
2+ import json
3+ import logging
4+ import uuid
5+ from datetime import datetime , timedelta
6+ from enum import Enum
7+ from typing import Dict , Any , Optional , Callable
8+
9+ from app .core .redis import redis_client
10+ from app .core .config import settings
11+
12+
13+ class JobStatus (str , Enum ):
14+ PENDING = "pending"
15+ PROCESSING = "processing"
16+ COMPLETED = "completed"
17+ FAILED = "failed"
18+
19+
20+ class BackgroundJobService :
21+ """Background job processing service using Redis queues."""
22+
23+ def __init__ (self ):
24+ self .queue_key = "background_jobs:queue"
25+ self .processing_key = "background_jobs:processing"
26+ self .results_key = "background_jobs:results"
27+ self .max_retries = 3
28+ self .retry_delay = 60 # seconds
29+
30+ async def enqueue_job (
31+ self ,
32+ job_type : str ,
33+ job_data : Dict [str , Any ],
34+ delay : int = 0 ,
35+ priority : int = 0
36+ ) -> str :
37+ """
38+ Enqueue a background job.
39+
40+ Args:
41+ job_type: Type of job to execute
42+ job_data: Data required for the job
43+ delay: Delay in seconds before job is available
44+ priority: Job priority (higher = more priority)
45+
46+ Returns:
47+ Job ID
48+ """
49+ job_id = str (uuid .uuid4 ())
50+ job = {
51+ 'id' : job_id ,
52+ 'type' : job_type ,
53+ 'data' : job_data ,
54+ 'status' : JobStatus .PENDING ,
55+ 'created_at' : datetime .utcnow ().isoformat (),
56+ 'delay_until' : (datetime .utcnow () + timedelta (seconds = delay )).isoformat () if delay > 0 else None ,
57+ 'priority' : priority ,
58+ 'retry_count' : 0 ,
59+ 'error_message' : None
60+ }
61+
62+ # Store job data
63+ redis_client .set (f"{ self .results_key } :{ job_id } " , job , expire = 86400 ) # 24 hours
64+
65+ # Add to queue
66+ queue_data = {
67+ 'job_id' : job_id ,
68+ 'priority' : priority ,
69+ 'available_at' : datetime .utcnow ().timestamp () + delay
70+ }
71+
72+ redis_client .client .zadd (self .queue_key , {json .dumps (queue_data ): - priority })
73+
74+ return job_id
75+
76+ async def get_job_status (self , job_id : str ) -> Optional [Dict [str , Any ]]:
77+ """
78+ Get job status and details.
79+
80+ Args:
81+ job_id: Job ID to check
82+
83+ Returns:
84+ Job data or None if not found
85+ """
86+ return redis_client .get (f"{ self .results_key } :{ job_id } " )
87+
88+ async def update_job_status (
89+ self ,
90+ job_id : str ,
91+ status : JobStatus ,
92+ result : Optional [Dict [str , Any ]] = None ,
93+ error_message : Optional [str ] = None
94+ ) -> bool :
95+ """
96+ Update job status.
97+
98+ Args:
99+ job_id: Job ID to update
100+ status: New job status
101+ result: Job result data
102+ error_message: Error message if job failed
103+
104+ Returns:
105+ True if updated successfully
106+ """
107+ job_data = redis_client .get (f"{ self .results_key } :{ job_id } " )
108+ if not job_data :
109+ return False
110+
111+ job_data ['status' ] = status
112+ job_data ['updated_at' ] = datetime .utcnow ().isoformat ()
113+
114+ if result :
115+ job_data ['result' ] = result
116+
117+ if error_message :
118+ job_data ['error_message' ] = error_message
119+
120+ # Update job data
121+ redis_client .set (f"{ self .results_key } :{ job_id } " , job_data , expire = 86400 )
122+
123+ # If completed or failed, remove from queue
124+ if status in [JobStatus .COMPLETED , JobStatus .FAILED ]:
125+ self ._remove_from_queue (job_id )
126+
127+ return True
128+
129+ async def increment_retry (self , job_id : str ) -> bool :
130+ """
131+ Increment job retry count.
132+
133+ Args:
134+ job_id: Job ID to retry
135+
136+ Returns:
137+ True if retry is allowed, False if max retries exceeded
138+ """
139+ job_data = redis_client .get (f"{ self .results_key } :{ job_id } " )
140+ if not job_data :
141+ return False
142+
143+ job_data ['retry_count' ] += 1
144+
145+ if job_data ['retry_count' ] >= self .max_retries :
146+ # Max retries exceeded, mark as failed
147+ await self .update_job_status (
148+ job_id ,
149+ JobStatus .FAILED ,
150+ error_message = f"Max retries ({ self .max_retries } ) exceeded"
151+ )
152+ return False
153+
154+ # Update retry count and re-queue with delay
155+ redis_client .set (f"{ self .results_key } :{ job_id } " , job_data , expire = 86400 )
156+
157+ # Re-queue with exponential backoff
158+ delay = self .retry_delay * (2 ** (job_data ['retry_count' ] - 1 ))
159+ await self .requeue_job (job_id , delay )
160+
161+ return True
162+
163+ async def requeue_job (self , job_id : str , delay : int = 0 ) -> bool :
164+ """
165+ Re-queue an existing job.
166+
167+ Args:
168+ job_id: Job ID to re-queue
169+ delay: Delay in seconds before job is available
170+
171+ Returns:
172+ True if re-queued successfully
173+ """
174+ job_data = redis_client .get (f"{ self .results_key } :{ job_id } " )
175+ if not job_data :
176+ return False
177+
178+ # Reset status to pending
179+ job_data ['status' ] = JobStatus .PENDING
180+ job_data ['delay_until' ] = (datetime .utcnow () + timedelta (seconds = delay )).isoformat () if delay > 0 else None
181+
182+ redis_client .set (f"{ self .results_key } :{ job_id } " , job_data , expire = 86400 )
183+
184+ # Add back to queue
185+ queue_data = {
186+ 'job_id' : job_id ,
187+ 'priority' : job_data .get ('priority' , 0 ),
188+ 'available_at' : datetime .utcnow ().timestamp () + delay
189+ }
190+
191+ redis_client .client .zadd (self .queue_key , {json .dumps (queue_data ): - job_data .get ('priority' , 0 )})
192+
193+ return True
194+
195+ def _remove_from_queue (self , job_id : str ):
196+ """Remove job from active queue."""
197+ # Get all queue items and remove the one with matching job_id
198+ queue_items = redis_client .client .zrange (self .queue_key , 0 , - 1 )
199+ for item in queue_items :
200+ try :
201+ queue_data = json .loads (item .decode ('utf-8' ))
202+ if queue_data .get ('job_id' ) == job_id :
203+ redis_client .client .zrem (self .queue_key , item )
204+ break
205+ except (json .JSONDecodeError , UnicodeDecodeError ):
206+ continue
207+
208+ async def get_next_job (self , timeout : int = 30 ) -> Optional [Dict [str , Any ]]:
209+ """
210+ Get next available job from queue.
211+
212+ Args:
213+ timeout: Timeout in seconds to wait for job
214+
215+ Returns:
216+ Job data or None if no job available
217+ """
218+ current_time = datetime .utcnow ().timestamp ()
219+
220+ # Get available jobs (sorted by priority and timestamp)
221+ available_jobs = redis_client .client .zrangebyscore (
222+ self .queue_key ,
223+ 0 ,
224+ current_time ,
225+ start = 0 ,
226+ num = 1 ,
227+ withscores = True
228+ )
229+
230+ if not available_jobs :
231+ return None
232+
233+ # Get the highest priority job
234+ job_json , score = available_jobs [0 ]
235+ try :
236+ queue_data = json .loads (job_json .decode ('utf-8' ))
237+ job_id = queue_data .get ('job_id' )
238+
239+ # Remove from queue and mark as processing
240+ redis_client .client .zrem (self .queue_key , job_json )
241+
242+ # Get full job data
243+ job_data = redis_client .get (f"{ self .results_key } :{ job_id } " )
244+ if job_data :
245+ await self .update_job_status (job_id , JobStatus .PROCESSING )
246+ return job_data
247+
248+ except (json .JSONDecodeError , UnicodeDecodeError ) as e :
249+ # Remove malformed job from queue
250+ redis_client .client .zrem (self .queue_key , job_json )
251+ logger .warning (f"Removed malformed job from queue: { e } " )
252+
253+ return None
254+
255+ async def cleanup_old_jobs (self , days : int = 7 ) -> int :
256+ """
257+ Clean up old completed/failed jobs.
258+
259+ Args:
260+ days: Age of jobs to clean up in days
261+
262+ Returns:
263+ Number of jobs cleaned up
264+ """
265+ cutoff_date = datetime .utcnow () - timedelta (days = days )
266+ cleaned_count = 0
267+
268+ # Get all job result keys
269+ result_keys = redis_client .client .keys (f"{ self .results_key } :*" )
270+ for key in result_keys :
271+ try :
272+ job_data = redis_client .get (key .decode ('utf-8' ))
273+ if not job_data :
274+ continue
275+
276+ # Check if job is old and completed/failed
277+ if job_data .get ('status' ) in [JobStatus .COMPLETED , JobStatus .FAILED ]:
278+ updated_at = job_data .get ('updated_at' , job_data .get ('created_at' ))
279+ if updated_at :
280+ job_date = datetime .fromisoformat (updated_at .replace ('Z' , '+00:00' ))
281+ if job_date < cutoff_date :
282+ redis_client .delete (key .decode ('utf-8' ))
283+ cleaned_count += 1
284+
285+ except Exception as e :
286+ logger .warning (f"Error cleaning up job { key } : { e } " )
287+ continue
288+
289+ return cleaned_count
290+
291+ def get_queue_stats (self ) -> Dict [str , int ]:
292+ """
293+ Get queue statistics.
294+
295+ Returns:
296+ Dictionary with queue statistics
297+ """
298+ stats = {
299+ 'pending' : 0 ,
300+ 'processing' : 0 ,
301+ 'completed' : 0 ,
302+ 'failed' : 0
303+ }
304+
305+ try :
306+ # Count jobs in queue
307+ stats ['pending' ] = redis_client .client .zcard (self .queue_key )
308+
309+ # Count jobs by status in results
310+ result_keys = redis_client .client .keys (f"{ self .results_key } :*" )
311+ for key in result_keys :
312+ try :
313+ job_data = redis_client .get (key .decode ('utf-8' ))
314+ if job_data :
315+ status = job_data .get ('status' , JobStatus .PENDING )
316+ if status in stats :
317+ stats [status ] += 1
318+ except Exception :
319+ continue
320+
321+ except Exception as e :
322+ logger .error (f"Error getting queue stats: { e } " )
323+
324+ return stats
325+
326+
327+ # Logger instance
328+ logger = logging .getLogger (__name__ )
329+
330+ # Global background job service instance
331+ background_service = BackgroundJobService ()
332+
333+
334+ # Decorator for creating background jobs
335+ def background_job (job_type : str , delay : int = 0 ):
336+ """
337+ Decorator to convert function into background job.
338+
339+ Args:
340+ job_type: Type identifier for the job
341+ delay: Delay in seconds before job starts
342+ """
343+ def decorator (func : Callable ):
344+ async def wrapper (* args , ** kwargs ):
345+ # Create job data
346+ job_data = {
347+ 'function' : func .__name__ ,
348+ 'args' : args ,
349+ 'kwargs' : kwargs ,
350+ 'module' : func .__module__
351+ }
352+
353+ # Enqueue job
354+ job_id = await background_service .enqueue_job (
355+ job_type = job_type ,
356+ job_data = job_data ,
357+ delay = delay
358+ )
359+
360+ return job_id
361+
362+ return wrapper
363+ return decorator
0 commit comments