@@ -50,6 +50,7 @@ def __init__(
50
50
self .broker = broker
51
51
self .labels = labels
52
52
self .custom_task_id : Optional [str ] = None
53
+ self .custom_schedule_id : Optional [str ] = None
53
54
54
55
def with_labels (
55
56
self ,
@@ -77,6 +78,19 @@ def with_task_id(self, task_id: str) -> "AsyncKicker[_FuncParams, _ReturnType]":
77
78
self .custom_task_id = task_id
78
79
return self
79
80
81
+ def with_schedule_id (
82
+ self ,
83
+ schedule_id : str ,
84
+ ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
85
+ """
86
+ Set schedule_id for current execution.
87
+
88
+ :param schedule_id: custom schedule id.
89
+ :return: kicker with custom schedule id.
90
+ """
91
+ self .custom_schedule_id = schedule_id
92
+ return self
93
+
80
94
def with_broker (
81
95
self ,
82
96
broker : "AsyncBroker" ,
@@ -166,7 +180,9 @@ async def schedule_by_cron(
166
180
167
181
:return: schedule id.
168
182
"""
169
- schedule_id = self .broker .id_generator ()
183
+ schedule_id = self .custom_schedule_id
184
+ if schedule_id is None :
185
+ schedule_id = self .broker .id_generator ()
170
186
message = self ._prepare_message (* args , ** kwargs )
171
187
cron_offset = None
172
188
if isinstance (cron , CronSpec ):
@@ -201,7 +217,9 @@ async def schedule_by_time(
201
217
:param args: function's args.
202
218
:param kwargs: function's kwargs.
203
219
"""
204
- schedule_id = self .broker .id_generator ()
220
+ schedule_id = self .custom_schedule_id
221
+ if schedule_id is None :
222
+ schedule_id = self .broker .id_generator ()
205
223
message = self ._prepare_message (* args , ** kwargs )
206
224
scheduled = ScheduledTask (
207
225
schedule_id = schedule_id ,
0 commit comments