|
1 | 1 | import concurrent |
| 2 | +import os |
2 | 3 | import threading |
3 | 4 | import time |
4 | 5 |
|
|
19 | 20 | from memos.mem_scheduler.schemas.general_schemas import ( |
20 | 21 | DEFAULT_STOP_WAIT, |
21 | 22 | ) |
22 | | -from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem |
| 23 | +from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem |
23 | 24 | from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem |
24 | 25 | from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator |
25 | 26 | from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue |
@@ -200,6 +201,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): |
200 | 201 | if self.status_tracker: |
201 | 202 | for msg in messages: |
202 | 203 | self.status_tracker.task_completed(task_id=msg.item_id, user_id=msg.user_id) |
| 204 | + self._maybe_emit_task_completion(messages) |
203 | 205 | self.metrics.task_completed(user_id=m.user_id, task_type=m.label) |
204 | 206 |
|
205 | 207 | emit_monitor_event( |
@@ -284,6 +286,56 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): |
284 | 286 |
|
285 | 287 | return wrapped_handler |
286 | 288 |
|
| 289 | + def _maybe_emit_task_completion(self, messages: list[ScheduleMessageItem]) -> None: |
| 290 | + """If all item_ids under a business task are completed, emit a single completion log.""" |
| 291 | + if not self.submit_web_logs or not self.status_tracker: |
| 292 | + return |
| 293 | + |
| 294 | + # messages in one batch can belong to different business task_ids; check each |
| 295 | + task_ids = {getattr(msg, "task_id", None) for msg in messages} |
| 296 | + task_ids.discard(None) |
| 297 | + if not task_ids: |
| 298 | + return |
| 299 | + |
| 300 | + # Use the first message only for shared fields; mem_cube_id is same within a batch |
| 301 | + first = messages[0] |
| 302 | + user_id = first.user_id |
| 303 | + mem_cube_id = first.mem_cube_id |
| 304 | + |
| 305 | + try: |
| 306 | + is_cloud_env = ( |
| 307 | + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" |
| 308 | + ) |
| 309 | + if not is_cloud_env: |
| 310 | + return |
| 311 | + |
| 312 | + for task_id in task_ids: |
| 313 | + status_data = self.status_tracker.get_task_status_by_business_id( |
| 314 | + business_task_id=task_id, user_id=user_id |
| 315 | + ) |
| 316 | + if not status_data or status_data.get("status") != "completed": |
| 317 | + continue |
| 318 | + |
| 319 | + event = ScheduleLogForWebItem( |
| 320 | + task_id=task_id, |
| 321 | + user_id=user_id, |
| 322 | + mem_cube_id=mem_cube_id, |
| 323 | + label="taskStatus", |
| 324 | + from_memory_type="status", |
| 325 | + to_memory_type="status", |
| 326 | + log_content=f"Task {task_id} completed", |
| 327 | + status="completed", |
| 328 | + ) |
| 329 | + self.submit_web_logs(event) |
| 330 | + except Exception: |
| 331 | + logger.warning( |
| 332 | + "Failed to emit task completion log. user_id=%s mem_cube_id=%s task_ids=%s", |
| 333 | + user_id, |
| 334 | + mem_cube_id, |
| 335 | + list(task_ids), |
| 336 | + exc_info=True, |
| 337 | + ) |
| 338 | + |
287 | 339 | def get_running_tasks( |
288 | 340 | self, filter_func: Callable[[RunningTaskItem], bool] | None = None |
289 | 341 | ) -> dict[str, RunningTaskItem]: |
|
0 commit comments