@@ -579,19 +579,25 @@ def _serialize(self, task: Any) -> str:
579579 def _deserialize (self , blob : str ) -> Any :
580580 return json .loads (blob )
581581
582- def reschedule (self , task_id : UUID ) -> None :
582+ def reschedule (
583+ self ,
584+ task_id : UUID ,
585+ decrease_ttl : Optional [bool ]= False
586+ ) -> None :
583587 """Move a task back from being processed to the task queue.
584588
585589 Workers can use this method to "drop" a work unit in case of
586590 eviction (because of an external issue like terminating a machine
587591 by aws and not because of a failure).
588592
589- This function does not modify the TTL.
593+ This function can optionally modify the TTL.
590594
591595 Parameters
592596 ----------
593597 task_id : UUID
594598 the task ID
599+ decrease_ttl : bool
600+ If True, decrease the TTL by one
595601
596602 Raises
597603 ------
@@ -603,13 +609,17 @@ def reschedule(self, task_id: UUID) -> None:
603609 if not isinstance (task_id , UUID ):
604610 raise ValueError ("task_id must be a UUID" )
605611 logger .info (f"Rescheduling task { task_id } .." )
612+ decrease_ttl_sql = ""
613+ if decrease_ttl :
614+ decrease_ttl_sql = "ttl = ttl - 1,"
615+
606616 conn = self .conn
607617 with conn .cursor () as cur :
608618 cur .execute (
609619 sql .SQL (
610620 """
611621 UPDATE {}
612- SET started_at = NULL
622+ SET {} started_at = NULL
613623 WHERE id = (
614624 SELECT id
615625 FROM {}
@@ -620,6 +630,7 @@ def reschedule(self, task_id: UUID) -> None:
620630 RETURNING id;"""
621631 ).format (
622632 sql .Identifier (self ._table_name ),
633+ sql .SQL (decrease_ttl_sql ),
623634 sql .Identifier (self ._table_name ),
624635 ),
625636 (task_id ,),
0 commit comments