Skip to content

Commit 1dc8ffb

Browse files
feat(scheduler): report task failure to web logs and fix exception handling
1 parent 47b03e0 commit 1dc8ffb

File tree

1 file changed

+31
-13
lines changed

1 file changed

+31
-13
lines changed

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
239239
self.status_tracker.task_failed(
240240
task_id=msg.item_id, user_id=msg.user_id, error_message=str(e)
241241
)
242+
self._maybe_emit_task_completion(messages, error=e)
242243
emit_monitor_event(
243244
"finish",
244245
m,
@@ -286,7 +287,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
286287

287288
return wrapped_handler
288289

289-
def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> None:
290+
def _maybe_emit_task_completion(
291+
self, messages: list[ScheduleMessageItem], error: Exception | None = None
292+
) -> None:
290293
"""If all item_ids under a business task are completed, emit a single completion log."""
291294
if not self.submit_web_logs or not self.status_tracker:
292295
return
@@ -311,20 +314,35 @@ def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> No
311314
status_data = self.status_tracker.get_task_status_by_business_id(
312315
business_task_id=task_id, user_id=user_id
313316
)
314-
if not status_data or status_data.get("status") != "completed":
317+
if not status_data:
315318
continue
316319

317-
event = ScheduleLogForWebItem(
318-
task_id=task_id,
319-
user_id=user_id,
320-
mem_cube_id=mem_cube_id,
321-
label="taskStatus",
322-
from_memory_type="status",
323-
to_memory_type="status",
324-
log_content=f"Task {task_id} completed",
325-
status="completed",
326-
)
327-
self.submit_web_logs(event)
320+
status = status_data.get("status")
321+
322+
if status == "completed" and error is None:
323+
event = ScheduleLogForWebItem(
324+
task_id=task_id,
325+
user_id=user_id,
326+
mem_cube_id=mem_cube_id,
327+
label="taskStatus",
328+
from_memory_type="status",
329+
to_memory_type="status",
330+
log_content=f"Task {task_id} completed",
331+
status="completed",
332+
)
333+
self.submit_web_logs(event)
334+
elif status == "failed" and error is not None:
335+
event = ScheduleLogForWebItem(
336+
task_id=task_id,
337+
user_id=user_id,
338+
mem_cube_id=mem_cube_id,
339+
label="taskStatus",
340+
from_memory_type="status",
341+
to_memory_type="status",
342+
log_content=f"Task {task_id} failed: {error!s}",
343+
status="failed",
344+
)
345+
self.submit_web_logs(event)
328346
except Exception:
329347
logger.warning(
330348
"Failed to emit task completion log. user_id=%s mem_cube_id=%s task_ids=%s",

0 commit comments

Comments
 (0)