File tree Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Expand file tree Collapse file tree 1 file changed +9
-3
lines changed Original file line number Diff line number Diff line change 33import datetime
44import random
55from logging import getLogger
6+ from types import NoneType
67from typing import Any
78
89from taskiq import ScheduleSource
@@ -61,6 +62,11 @@ def __init__(
6162 self .max_delay_exponent = max_delay_exponent
6263 self .schedule_source = schedule_source
6364
65+ if not isinstance (schedule_source , (ScheduleSource , NoneType )):
66+ raise TypeError (
67+ "schedule_source must be an instance of ScheduleSource or None" ,
68+ )
69+
6470 def is_retry_on_error (self , message : TaskiqMessage ) -> bool :
6571 """
6672 Check if retry is enabled for this task.
@@ -103,7 +109,9 @@ async def on_send(
103109 delay : float ,
104110 ) -> None :
105111 """Execute the task with a delay."""
106- if isinstance (self .schedule_source , ScheduleSource ):
112+ if self .schedule_source is None :
113+ await kicker .with_labels (delay = delay ).kiq (* message .args , ** message .kwargs )
114+ else :
107115 target_time = datetime .datetime .now (datetime .UTC ) + datetime .timedelta (
108116 seconds = delay ,
109117 )
@@ -113,8 +121,6 @@ async def on_send(
113121 * message .args ,
114122 ** message .kwargs ,
115123 )
116- else :
117- await kicker .with_labels (delay = delay ).kiq (* message .args , ** message .kwargs )
118124
119125 async def on_error (
120126 self ,
You can’t perform that action at this time.
0 commit comments