Skip to content

Commit 28e1368

Browse files
Fix scheduler task tracking
1 parent 5ba44d6 commit 28e1368

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,13 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
180180
)
181181

182182
# Mark task as completed and remove from tracking
183-
if isinstance(self.memos_message_queue, SchedulerLocalQueue):
184-
with self._task_lock:
185-
if task_item.item_id in self._running_tasks:
186-
task_item.mark_completed(result)
187-
del self._running_tasks[task_item.item_id]
188-
self._completed_tasks.append(task_item)
189-
if len(self._completed_tasks) > self.completed_tasks_max_show_size:
190-
self._completed_tasks.pop(0)
183+
with self._task_lock:
184+
if task_item.item_id in self._running_tasks:
185+
task_item.mark_completed(result)
186+
del self._running_tasks[task_item.item_id]
187+
self._completed_tasks.append(task_item)
188+
if len(self._completed_tasks) > self.completed_tasks_max_show_size:
189+
self._completed_tasks.pop(0)
191190
logger.info(f"Task completed: {task_item.get_execution_info()}")
192191
return result
193192

@@ -199,13 +198,12 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
199198
task_id=task_item.item_id, user_id=task_item.user_id, error_message=str(e)
200199
)
201200
# Mark task as failed and remove from tracking
202-
if isinstance(self.memos_message_queue, SchedulerLocalQueue):
203-
with self._task_lock:
204-
if task_item.item_id in self._running_tasks:
205-
task_item.mark_failed(str(e))
206-
del self._running_tasks[task_item.item_id]
207-
if len(self._completed_tasks) > self.completed_tasks_max_show_size:
208-
self._completed_tasks.pop(0)
201+
with self._task_lock:
202+
if task_item.item_id in self._running_tasks:
203+
task_item.mark_failed(str(e))
204+
del self._running_tasks[task_item.item_id]
205+
if len(self._completed_tasks) > self.completed_tasks_max_show_size:
206+
self._completed_tasks.pop(0)
209207
logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}")
210208

211209
raise
@@ -395,6 +393,10 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]):
395393
messages=msgs,
396394
)
397395

396+
# Track running task for status/monitoring
397+
with self._task_lock:
398+
self._running_tasks[task_item.item_id] = task_item
399+
398400
# Create wrapped handler for task tracking
399401
wrapped_handler = self._create_task_wrapper(handler, task_item)
400402

0 commit comments

Comments
 (0)