@@ -156,15 +156,14 @@ def add(
156156 job dies.
157157 can_start_at : datetime
158158 The earliest time the task can be started.
159- If None, set current time. A task will not be started before this
160- time.
159+ If None, set current time. For consistency the time is
160+ from the database clock. A task will not be started before
161+ this time.
161162 Returns
162163 -------
163164 task_id :
164165 The random UUID that was generated for this task
165166 """
166- if can_start_at is None :
167- can_start_at = datetime .now (UTC )
168167 # make sure the timeout is an actual number, otherwise we'll run
169168 # into problems later when we calculate the actual deadline
170169 lease_timeout = float (lease_timeout )
@@ -186,7 +185,7 @@ def add(
186185 lease_timeout,
187186 can_start_at
188187 )
189- VALUES (%s, %s, %s, %s, %s, %s )
188+ VALUES (%s, %s, %s, %s, %s, COALESCE(%s, current_timestamp) )
190189 """
191190 ).format (sql .Identifier (self ._table_name )),
192191 (
@@ -222,16 +221,15 @@ def add_many(
222221 job dies.
223222 can_start_at : datetime
224223 The earliest time the task can be started.
225- If None, set current time. A task will not be started before this
224+ If None, set current time. For consistency the time is
225+ from the database clock. A task will not be started before this
226226 time.
227227 Returns
228228 -------
229229 task_ids :
230230 List of random UUIDs that were generated for this task.
231231 The order is the same of the given tasks
232232 """
233- if can_start_at is None :
234- can_start_at = datetime .now (UTC )
235233 # make sure the timeout is an actual number, otherwise we'll run
236234 # into problems later when we calculate the actual deadline
237235 lease_timeout = float (lease_timeout )
@@ -253,7 +251,9 @@ def add_many(
253251 lease_timeout,
254252 can_start_at
255253 )
256- VALUES (%s, %s, %s, %s, %s, %s)
254+ VALUES (
255+ %s, %s, %s, %s, %s, COALESCE(%s, current_timestamp)
256+ )
257257 """
258258 ).format (sql .Identifier (self ._table_name )),
259259 (
@@ -273,8 +273,8 @@ def get(self) -> Tuple[
273273 """Get a task from the task queue (non-blocking).
274274
275275 This statement marks the next available task in the queue as
276- "processing" and returns its ID and task details. The query
277- uses a FOR UPDATE SKIP LOCKED clause to lock the selected
276+ started (being processed) and returns its ID and task details.
277+ The query uses a FOR UPDATE SKIP LOCKED clause to lock the selected
278278 task so that other workers can't select the same task simultaneously.
279279
280280 After executing the query, the method fetches the result using
@@ -291,7 +291,7 @@ def get(self) -> Tuple[
291291 >>> taskqueue.complete(task_id)
292292
293293 After some time (i.e. `lease_timeout`) tasks expire and are
294- marked as not processing and the TTL is decreased by
294+ marked as not being processed and the TTL is decreased by
295295 one. If TTL is still > 0 the task will be retried.
296296
297297 Note, this method is non-blocking, i.e. it returns immediately
@@ -525,7 +525,7 @@ def get_updated_expired_task(
525525 ) -> Tuple [Optional [str ], Optional [int ]]:
526526 """
527527 Given the id of an expired task, it tries to reschedule it by
528- marking it as not processing , resetting the deadline
528+ marking it as not started , resetting the deadline
529529 and decreasing TTL by one. It returns None if the task is
530530 already updated (or being updated) by another worker.
531531
@@ -579,18 +579,29 @@ def _serialize(self, task: Any) -> str:
579579 def _deserialize (self , blob : str ) -> Any :
580580 return json .loads (blob )
581581
582- def reschedule (self , task_id : Optional [UUID ]) -> None :
583- """Move a task back from the processing- to the task queue.
582+ def reschedule (
583+ self ,
584+ task_id : UUID ,
585+ decrease_ttl : Optional [bool ] = False ,
586+ ) -> None :
587+ """Move a task back from being processed to the task queue.
584588
585589 Workers can use this method to "drop" a work unit in case of
586- eviction.
590+ eviction (because of an external issue like terminating a machine
591+ by AWS and not because of a failure).
592+ Rescheduled work units are immediately available for processing again,
593+ and unless decrease_ttl is set to True, the TTL is not modified.
587594
588- This function does not modify the TTL.
595+ This function can optionally modify the TTL, setting decrease_ttl to
596+ True. This allows to handle a failure quickly without waiting the
597+ lease_timeout.
589598
590599 Parameters
591600 ----------
592- task_id : str
601+ task_id : UUID
593602 the task ID
603+ decrease_ttl : bool
604+ If True, decrease the TTL by one
594605
595606 Raises
596607 ------
@@ -602,13 +613,17 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
602613 if not isinstance (task_id , UUID ):
603614 raise ValueError ("task_id must be a UUID" )
604615 logger .info (f"Rescheduling task { task_id } .." )
616+ decrease_ttl_sql = ""
617+ if decrease_ttl :
618+ decrease_ttl_sql = "ttl = ttl - 1,"
619+
605620 conn = self .conn
606621 with conn .cursor () as cur :
607622 cur .execute (
608623 sql .SQL (
609624 """
610625 UPDATE {}
611- SET started_at = NULL
626+ SET {} started_at = NULL
612627 WHERE id = (
613628 SELECT id
614629 FROM {}
@@ -619,6 +634,7 @@ def reschedule(self, task_id: Optional[UUID]) -> None:
619634 RETURNING id;"""
620635 ).format (
621636 sql .Identifier (self ._table_name ),
637+ sql .SQL (decrease_ttl_sql ),
622638 sql .Identifier (self ._table_name ),
623639 ),
624640 (task_id ,),
0 commit comments