File tree Expand file tree Collapse file tree 1 file changed +8
-7
lines changed Expand file tree Collapse file tree 1 file changed +8
-7
lines changed Original file line number Diff line number Diff line change
1
+ from copy import deepcopy
1
2
from logging import getLogger
2
3
from typing import Any
3
4
@@ -40,19 +41,19 @@ async def on_error(
40
41
# Check if retrying is enabled for the task.
41
42
if retry_on_error != "True" :
42
43
return
43
-
44
+ new_msg = deepcopy ( message )
44
45
# Getting number of previous retries.
45
- retries = int (message .labels .get ("_retries" , 0 )) + 1
46
- message .labels ["_retries" ] = str (retries )
47
- max_retries = int (message .labels .get ("max_retries" , self .default_retry_count ))
46
+ retries = int (new_msg .labels .get ("_retries" , 0 )) + 1
47
+ new_msg .labels ["_retries" ] = str (retries )
48
+ max_retries = int (new_msg .labels .get ("max_retries" , self .default_retry_count ))
48
49
if retries < max_retries :
49
50
logger .info (
50
51
"Task '%s' invocation failed. Retrying." ,
51
52
message .task_name ,
52
53
)
53
- message .labels ["_parent" ] = message .task_id
54
- message .task_id = self .broker .id_generator ()
55
- broker_message = self .broker .formatter .dumps (message = message )
54
+ new_msg .labels ["_parent" ] = message .task_id
55
+ new_msg .task_id = self .broker .id_generator ()
56
+ broker_message = self .broker .formatter .dumps (message = new_msg )
56
57
await self .broker .kick (broker_message )
57
58
else :
58
59
logger .warning (
You can’t perform that action at this time.
0 commit comments