1
1
from dataclasses import asdict , is_dataclass
2
2
from logging import getLogger
3
- from typing import (
3
+ from typing import ( # noqa: WPS235
4
4
TYPE_CHECKING ,
5
5
Any ,
6
6
Coroutine ,
7
7
Dict ,
8
8
Generic ,
9
+ Optional ,
9
10
TypeVar ,
10
11
Union ,
11
12
overload ,
@@ -42,6 +43,7 @@ def __init__(
42
43
self .task_name = task_name
43
44
self .broker = broker
44
45
self .labels = labels
46
+ self .custom_task_id : Optional [str ] = None
45
47
46
48
def with_labels (
47
49
self ,
@@ -56,6 +58,19 @@ def with_labels(
56
58
self .labels .update (labels )
57
59
return self
58
60
61
+ def with_task_id (self , task_id : str ) -> "AsyncKicker[_FuncParams, _ReturnType]" :
62
+ """
63
+ Set task_id for current execution.
64
+
65
+ Please use this method with caution,
66
+ because it may brake the logic of getting results.
67
+
68
+ :param task_id: custom task id.
69
+ :return: kicker with custom task id.
70
+ """
71
+ self .custom_task_id = task_id
72
+ return self
73
+
59
74
def with_broker (
60
75
self ,
61
76
broker : "AsyncBroker" ,
@@ -200,8 +215,12 @@ def _prepare_message( # noqa: WPS210
200
215
for label , label_val in self .labels .items ():
201
216
labels [label ] = str (label_val )
202
217
218
+ task_id = self .custom_task_id
219
+ if task_id is None :
220
+ task_id = self .broker .id_generator ()
221
+
203
222
return TaskiqMessage (
204
- task_id = self . broker . id_generator () ,
223
+ task_id = task_id ,
205
224
task_name = self .task_name ,
206
225
labels = labels ,
207
226
args = formatted_args ,
0 commit comments