@@ -132,9 +132,10 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
132132 start_time = time .time ()
133133 start_iso = datetime .fromtimestamp (start_time , tz = timezone .utc ).isoformat ()
134134 if self .status_tracker :
135- self .status_tracker .task_started (
136- task_id = task_item .item_id , user_id = task_item .user_id
137- )
135+ for msg in messages :
136+ self .status_tracker .task_started (
137+ task_id = msg .item_id , user_id = msg .user_id
138+ )
138139 try :
139140 first_msg = messages [0 ]
140141 trace_id = getattr (first_msg , "trace_id" , None ) or generate_trace_id ()
@@ -197,9 +198,10 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
197198 duration = finish_time - start_time
198199 self .metrics .observe_task_duration (duration , m .user_id , m .label )
199200 if self .status_tracker :
200- self .status_tracker .task_completed (
201- task_id = task_item .item_id , user_id = task_item .user_id
202- )
201+ for msg in messages :
202+ self .status_tracker .task_completed (
203+ task_id = msg .item_id , user_id = msg .user_id
204+ )
203205 self .metrics .task_completed (user_id = m .user_id , task_type = m .label )
204206
205207 emit_monitor_event (
@@ -229,9 +231,10 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
229231 finish_time = time .time ()
230232 self .metrics .task_failed (m .user_id , m .label , type (e ).__name__ )
231233 if self .status_tracker :
232- self .status_tracker .task_failed (
233- task_id = task_item .item_id , user_id = task_item .user_id , error_message = str (e )
234- )
234+ for msg in messages :
235+ self .status_tracker .task_failed (
236+ task_id = msg .item_id , user_id = msg .user_id , error_message = str (e )
237+ )
235238 emit_monitor_event (
236239 "finish" ,
237240 m ,
0 commit comments