Skip to content

Commit 3d5a6e5

Browse files
glin93fridayL
andauthored
Feat/task message (#656)
* Add task completion log event for cloud tasks * Style: reformat dispatcher.py with ruff * feat(scheduler): report task failure to web logs and fix exception handling * fix(scheduler): fix SchedulerRedisQueue status_tracker missing attribute error * feat(scheduler): implement status-driven failure logging and fix redis_queue status_tracker init * fix(scheduler): propagate status_tracker to SchedulerRedisQueue in ScheduleTaskQueue * fix(scheduler): propagate status_tracker via setter to ensure proper initialization * fix: remove redundant task completion status update in redis queue ack --------- Co-authored-by: [email protected] <> Co-authored-by: chunyu li <[email protected]>
1 parent ad97277 commit 3d5a6e5

File tree

5 files changed

+107
-3
lines changed

5 files changed

+107
-3
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ def initialize_modules(
224224
if self.dispatcher:
225225
self.dispatcher.status_tracker = self.status_tracker
226226
if self.memos_message_queue:
227-
self.memos_message_queue.status_tracker = self.status_tracker
227+
# Use the setter to propagate to the inner queue (e.g. SchedulerRedisQueue)
228+
self.memos_message_queue.set_status_tracker(self.status_tracker)
228229
# initialize submodules
229230
self.chat_llm = chat_llm
230231
self.process_llm = process_llm

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import concurrent
2+
import os
23
import threading
34
import time
45

@@ -19,7 +20,7 @@
1920
from memos.mem_scheduler.schemas.general_schemas import (
2021
DEFAULT_STOP_WAIT,
2122
)
22-
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
23+
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem
2324
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
2425
from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator
2526
from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue
@@ -200,6 +201,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
200201
if self.status_tracker:
201202
for msg in messages:
202203
self.status_tracker.task_completed(task_id=msg.item_id, user_id=msg.user_id)
204+
self._maybe_emit_task_completion(messages)
203205
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)
204206

205207
emit_monitor_event(
@@ -237,6 +239,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
237239
self.status_tracker.task_failed(
238240
task_id=msg.item_id, user_id=msg.user_id, error_message=str(e)
239241
)
242+
self._maybe_emit_task_completion(messages, error=e)
240243
emit_monitor_event(
241244
"finish",
242245
m,
@@ -284,6 +287,85 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
284287

285288
return wrapped_handler
286289

290+
def _maybe_emit_task_completion(
291+
self, messages: list[ScheduleMessageItem], error: Exception | None = None
292+
) -> None:
293+
"""If all item_ids under a business task are completed, emit a single completion log."""
294+
if not self.submit_web_logs or not self.status_tracker:
295+
return
296+
297+
# messages in one batch can belong to different business task_ids; check each
298+
task_ids = {getattr(msg, "task_id", None) for msg in messages}
299+
task_ids.discard(None)
300+
if not task_ids:
301+
return
302+
303+
# Use the first message only for shared fields; mem_cube_id is same within a batch
304+
first = messages[0]
305+
user_id = first.user_id
306+
mem_cube_id = first.mem_cube_id
307+
308+
try:
309+
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
310+
if not is_cloud_env:
311+
return
312+
313+
for task_id in task_ids:
314+
status_data = self.status_tracker.get_task_status_by_business_id(
315+
business_task_id=task_id, user_id=user_id
316+
)
317+
if not status_data:
318+
continue
319+
320+
status = status_data.get("status")
321+
322+
if status == "completed":
323+
# Only emit success log if we didn't just catch an exception locally
324+
# (Although if status is 'completed', local error shouldn't happen theoretically,
325+
# unless status update lags or is inconsistent. We trust status_tracker here.)
326+
event = ScheduleLogForWebItem(
327+
task_id=task_id,
328+
user_id=user_id,
329+
mem_cube_id=mem_cube_id,
330+
label="taskStatus",
331+
from_memory_type="status",
332+
to_memory_type="status",
333+
log_content=f"Task {task_id} completed",
334+
status="completed",
335+
)
336+
self.submit_web_logs(event)
337+
338+
elif status == "failed":
339+
# Construct error message
340+
error_msg = str(error) if error else None
341+
if not error_msg:
342+
# Try to get errors from status_tracker aggregation
343+
errors = status_data.get("errors", [])
344+
if errors:
345+
error_msg = "; ".join(errors)
346+
else:
347+
error_msg = "Unknown error (check system logs)"
348+
349+
event = ScheduleLogForWebItem(
350+
task_id=task_id,
351+
user_id=user_id,
352+
mem_cube_id=mem_cube_id,
353+
label="taskStatus",
354+
from_memory_type="status",
355+
to_memory_type="status",
356+
log_content=f"Task {task_id} failed: {error_msg}",
357+
status="failed",
358+
)
359+
self.submit_web_logs(event)
360+
except Exception:
361+
logger.warning(
362+
"Failed to emit task completion log. user_id=%s mem_cube_id=%s task_ids=%s",
363+
user_id,
364+
mem_cube_id,
365+
list(task_ids),
366+
exc_info=True,
367+
)
368+
287369
def get_running_tasks(
288370
self, filter_func: Callable[[RunningTaskItem], bool] | None = None
289371
) -> dict[str, RunningTaskItem]:

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
DEFAULT_STREAM_KEYS_REFRESH_INTERVAL_SEC,
2323
)
2424
from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator
25+
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
2526
from memos.mem_scheduler.webservice_modules.redis_service import RedisSchedulerModule
2627

2728

@@ -51,6 +52,7 @@ def __init__(
5152
consumer_name: str | None = "scheduler_consumer",
5253
max_len: int | None = None,
5354
auto_delete_acked: bool = True, # Whether to automatically delete acknowledged messages
55+
status_tracker: TaskStatusTracker | None = None,
5456
):
5557
"""
5658
Initialize the Redis queue.
@@ -62,6 +64,7 @@ def __init__(
6264
max_len: Maximum length of the stream (for memory management)
6365
maxsize: Maximum size of the queue (for Queue compatibility, ignored)
6466
auto_delete_acked: Whether to automatically delete acknowledged messages from stream
67+
status_tracker: TaskStatusTracker instance for tracking task status
6568
"""
6669
super().__init__()
6770
# Stream configuration
@@ -101,6 +104,7 @@ def __init__(
101104
self.message_pack_cache = deque()
102105

103106
self.orchestrator = SchedulerOrchestrator() if orchestrator is None else orchestrator
107+
self.status_tracker = status_tracker
104108

105109
# Cached stream keys and refresh control
106110
self._stream_keys_cache: list[str] = []
@@ -354,7 +358,6 @@ def ack_message(
354358
self._redis_conn.xack(stream_key, self.consumer_group, redis_message_id)
355359

356360
if message:
357-
self.status_tracker.task_completed(task_id=message.item_id, user_id=message.user_id)
358361
logger.info(
359362
f"Message {message.item_id} | {message.label} | {message.content} has been acknowledged."
360363
)

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,26 @@ def __init__(
4242
consumer_group="scheduler_group",
4343
consumer_name="scheduler_consumer",
4444
orchestrator=self.orchestrator,
45+
status_tracker=self.status_tracker, # Propagate status_tracker
4546
)
4647
else:
4748
self.memos_message_queue = SchedulerLocalQueue(maxsize=self.maxsize)
4849

4950
self.disabled_handlers = disabled_handlers
5051

52+
def set_status_tracker(self, status_tracker: TaskStatusTracker) -> None:
53+
"""
54+
Set the status tracker for this queue and propagate it to the underlying queue implementation.
55+
56+
This allows the tracker to be injected after initialization (e.g., when Redis connection becomes available).
57+
"""
58+
self.status_tracker = status_tracker
59+
if self.memos_message_queue and hasattr(self.memos_message_queue, "status_tracker"):
60+
# SchedulerRedisQueue has status_tracker attribute (from our previous fix)
61+
# SchedulerLocalQueue can also accept it dynamically if it doesn't use __slots__
62+
self.memos_message_queue.status_tracker = status_tracker
63+
logger.info("Propagated status_tracker to underlying message queue")
64+
5165
def ack_message(
5266
self,
5367
user_id: str,

src/memos/mem_scheduler/utils/status_tracker.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,14 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) ->
142142
# Get statuses for all items
143143
key = self._get_key(user_id)
144144
item_statuses = []
145+
errors = []
145146
for item_id in item_ids:
146147
item_data_json = self.redis.hget(key, item_id)
147148
if item_data_json:
148149
item_data = json.loads(item_data_json)
149150
item_statuses.append(item_data["status"])
151+
if item_data.get("status") == "failed" and "error" in item_data:
152+
errors.append(item_data["error"])
150153

151154
if not item_statuses:
152155
return None
@@ -167,6 +170,7 @@ def get_task_status_by_business_id(self, business_task_id: str, user_id: str) ->
167170
"business_task_id": business_task_id,
168171
"item_count": len(item_ids),
169172
"item_statuses": item_statuses,
173+
"errors": errors,
170174
}
171175

172176
def get_all_tasks_global(self) -> dict[str, dict[str, dict]]:

0 commit comments

Comments
 (0)