Skip to content

Commit 4ed2eec

Browse files
feat(scheduler): implement status-driven failure logging and fix redis_queue status_tracker init
1 parent fbf82ad commit 4ed2eec

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,10 @@ def _maybe_emit_task_completion(
319319

320320
status = status_data.get("status")
321321

322-
if status == "completed" and error is None:
322+
if status == "completed":
323+
# Only emit success log if we didn't just catch an exception locally
324+
# (Although if status is 'completed', local error shouldn't happen theoretically,
325+
# unless status update lags or is inconsistent. We trust status_tracker here.)
323326
event = ScheduleLogForWebItem(
324327
task_id=task_id,
325328
user_id=user_id,
@@ -331,15 +334,26 @@ def _maybe_emit_task_completion(
331334
status="completed",
332335
)
333336
self.submit_web_logs(event)
334-
elif status == "failed" and error is not None:
337+
338+
elif status == "failed":
339+
# Construct error message
340+
error_msg = str(error) if error else None
341+
if not error_msg:
342+
# Try to get errors from status_tracker aggregation
343+
errors = status_data.get("errors", [])
344+
if errors:
345+
error_msg = "; ".join(errors)
346+
else:
347+
error_msg = "Unknown error (check system logs)"
348+
335349
event = ScheduleLogForWebItem(
336350
task_id=task_id,
337351
user_id=user_id,
338352
mem_cube_id=mem_cube_id,
339353
label="taskStatus",
340354
from_memory_type="status",
341355
to_memory_type="status",
342-
log_content=f"Task {task_id} failed: {error!s}",
356+
log_content=f"Task {task_id} failed: {error_msg}",
343357
status="failed",
344358
)
345359
self.submit_web_logs(event)

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
DEFAULT_STREAM_KEY_PREFIX,
2222
DEFAULT_STREAM_KEYS_REFRESH_INTERVAL_SEC,
2323
)
24-
from memos.mem_scheduler.utils.monitor_event_utils import emit_monitor_event, to_iso
24+
from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator
2525
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
26+
from memos.mem_scheduler.webservice_modules.redis_service import RedisSchedulerModule
2627

2728

2829
logger = get_logger(__name__)

src/memos/mem_scheduler/utils/status_tracker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,14 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) ->
142142
# Get statuses for all items
143143
key = self._get_key(user_id)
144144
item_statuses = []
145+
errors = []
145146
for item_id in item_ids:
146147
item_data_json = self.redis.hget(key, item_id)
147148
if item_data_json:
148149
item_data = json.loads(item_data_json)
149150
item_statuses.append(item_data["status"])
151+
if item_data.get("status") == "failed" and "error" in item_data:
152+
errors.append(item_data["error"])
150153

151154
if not item_statuses:
152155
return None
@@ -167,6 +170,7 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) ->
167170
"business_task_id": business_task_id,
168171
"item_count": len(item_ids),
169172
"item_statuses": item_statuses,
173+
"errors": errors,
170174
}
171175

172176
def get_all_tasks_global(self) -> dict[str, dict[str, dict]]:

0 commit comments

Comments
 (0)