@@ -51,12 +51,7 @@ async def enqueue_job(self, job_id: str, file_path: str, task_data: dict | None
5151 pipeline .expire (task_key , settings .REDIS_JOB_TIMEOUT )
5252
5353 # Add to stream for instant delivery
54- stream_data = {
55- "task_id" : task_id ,
56- "job_id" : job_id ,
57- "priority" : str (priority ),
58- "timestamp" : str (time .time ())
59- }
54+ stream_data = {"task_id" : task_id , "job_id" : job_id , "priority" : str (priority ), "timestamp" : str (time .time ())}
6055 pipeline .xadd (self .job_stream , stream_data )
6156
6257 # Publish event for instant notification
@@ -78,9 +73,7 @@ async def get_redis(self) -> aioredis.Redis:
7873 )
7974 # Create consumer group if it doesn't exist
8075 try :
81- await self .redis .xgroup_create (
82- self .job_stream , self .consumer_group , id = "0" , mkstream = True
83- )
76+ await self .redis .xgroup_create (self .job_stream , self .consumer_group , id = "0" , mkstream = True )
8477 except aioredis .ResponseError as e :
8578 if "BUSYGROUP" not in str (e ):
8679 raise
@@ -97,7 +90,7 @@ async def consume_jobs(self) -> AsyncIterator[QueuedTask]:
9790 self .consumer_name ,
9891 {self .job_stream : ">" },
9992 count = 1 ,
100- block = 0 # Block indefinitely until message arrives
93+ block = 0 , # Block indefinitely until message arrives
10194 )
10295
10396 if not messages :
@@ -111,9 +104,7 @@ async def consume_jobs(self) -> AsyncIterator[QueuedTask]:
111104 job_data = await redis_client .hgetall (task_key )
112105 if job_data :
113106 # Acknowledge message
114- await redis_client .xack (
115- self .job_stream , self .consumer_group , msg_id
116- )
107+ await redis_client .xack (self .job_stream , self .consumer_group , msg_id )
117108 yield QueuedTask .from_redis_dict (job_data )
118109
119110 except aioredis .ConnectionError :
@@ -127,10 +118,9 @@ async def mark_job_processing(self, task_id: str, job_id: str) -> bool:
127118
128119 pipeline = redis_client .pipeline ()
129120 pipeline .hset (task_key , "status" , "processing" )
130- pipeline .publish (f"cv:job:{ job_id } :status" , json .dumps ({
131- "status" : "processing" ,
132- "timestamp" : datetime .now ().isoformat ()
133- }))
121+ pipeline .publish (
122+ f"cv:job:{ job_id } :status" , json .dumps ({"status" : "processing" , "timestamp" : datetime .now ().isoformat ()})
123+ )
134124 results = await pipeline .execute ()
135125 return bool (results [0 ] > 0 )
136126
@@ -145,11 +135,10 @@ async def mark_job_completed(self, task_id: str, job_id: str, result: dict[str,
145135 pipeline .expire (task_key , 3600 )
146136
147137 # Publish completion event for real-time updates
148- pipeline .publish (f"cv:job:{ job_id } :completed" , json .dumps ({
149- "status" : "completed" ,
150- "job_id" : job_id ,
151- "timestamp" : datetime .now ().isoformat ()
152- }))
138+ pipeline .publish (
139+ f"cv:job:{ job_id } :completed" ,
140+ json .dumps ({"status" : "completed" , "job_id" : job_id , "timestamp" : datetime .now ().isoformat ()}),
141+ )
153142
154143 await pipeline .execute ()
155144 return True
@@ -187,11 +176,10 @@ async def mark_job_failed(self, task_id: str, job_id: str, error: str, retry: bo
187176 pipeline .expire (task_key , 86400 )
188177
189178 # Publish failure event
190- pipeline .publish (f"cv:job:{ job_id } :failed" , json .dumps ({
191- "status" : "failed" ,
192- "error" : error ,
193- "timestamp" : datetime .now ().isoformat ()
194- }))
179+ pipeline .publish (
180+ f"cv:job:{ job_id } :failed" ,
181+ json .dumps ({"status" : "failed" , "error" : error , "timestamp" : datetime .now ().isoformat ()}),
182+ )
195183
196184 await pipeline .execute ()
197185 return False
@@ -226,12 +214,15 @@ async def listen_for_retries(self):
226214 task_key = f"{ self .task_key_prefix } { task_id } "
227215 job_data = await redis_client .hgetall (task_key )
228216 if job_data :
229- await redis_client .xadd (self .job_stream , {
230- "task_id" : task_id ,
231- "job_id" : job_data .get ("job_id" , "" ),
232- "priority" : "1" , # Higher priority for retries
233- "timestamp" : str (time .time ())
234- })
217+ await redis_client .xadd (
218+ self .job_stream ,
219+ {
220+ "task_id" : task_id ,
221+ "job_id" : job_data .get ("job_id" , "" ),
222+ "priority" : "1" , # Higher priority for retries
223+ "timestamp" : str (time .time ()),
224+ },
225+ )
235226
236227 async def cleanup_expired_jobs (self , max_age_hours : int = 24 ) -> int :
237228 redis_client = await self .get_redis ()
0 commit comments