Skip to content

Commit 3f99afd

Browse files
tangg555fridayLglin93
authored
Scheduler: new feat about orchestrator task schedule (#614)
* debug an error function name * feat: Add DynamicCache compatibility for different transformers versions - Fix build_kv_cache method in hf.py to handle both old and new DynamicCache structures - Support new 'layers' attribute with key_cache/value_cache or keys/values - Maintain backward compatibility with direct key_cache/value_cache attributes - Add comprehensive error handling and logging for unsupported structures - Update move_dynamic_cache_htod function in kv.py for cross-version compatibility - Handle layers-based structure in newer transformers versions - Support alternative attribute names (keys/values vs key_cache/value_cache) - Preserve original functionality for older transformers versions - Add comprehensive tests for DynamicCache compatibility - Test activation memory update with mock DynamicCache layers - Verify layers attribute access across different transformers versions - Fix scheduler logger mock to include memory_manager attribute This resolves AttributeError issues when using different versions of the transformers library and ensures robust handling of DynamicCache objects. debug * feat: implement APIAnalyzerForScheduler for memory operations - Add APIAnalyzerForScheduler class with search/add operations - Support requests and http.client with connection reuse - Include comprehensive error handling and dynamic configuration - Add English test suite with real-world conversation scenarios * feat: Add search_ws API endpoint and enhance API analyzer functionality - Add search_ws endpoint in server_router.py for scheduler-enabled search - Fix missing imports: time module, SearchRequest class, and get_mos_product_instance function - Implement search_ws method in api_analyzer.py with HTTP client support - Add _search_ws_with_requests and _search_ws_with_http_client private methods - Include search_ws usage example in demonstration code - Enhance scheduler and dispatcher capabilities for improved memory management - Expand test coverage to ensure functionality stability This update primarily strengthens the memory scheduling system's search capabilities, providing users with more flexible API interface options. * fix: resolve test failures and warnings in test suite - Fix Pydantic serialization warning in test_memos_chen_tang_hello_world * Add warnings filter to suppress UserWarning from Pydantic serialization - Fix KeyError: 'past_key_values' in test_build_kv_cache_and_generation * Update mock configuration to properly return forward_output with past_key_values * Add DynamicCache version compatibility handling in test mocks * Support both old and new transformers versions with layers/key_cache attributes * Improve assertion logic to check all model calls for required parameters - Update base_scheduler.py to use centralized DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE constant * Add import for DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE from general_schemas * Replace hardcoded value 100 with configurable constant (1000) All tests now pass successfully with proper version compatibility handling. * feat: add a test_robustness execution to test thread pool execution * feat: optimize scheduler configuration and API search functionality - Add DEFAULT_TOP_K and DEFAULT_CONTEXT_WINDOW_SIZE global constants in general_schemas.py - Update base_scheduler.py to use global default values instead of hardcoded numbers - Fix SchedulerConfigFactory initialization issue by using keyword argument expansion - Resolve UnboundLocalError variable conflict in search_memories_ws function - Fix indentation and parameter issues in OptimizedScheduler search_for_api method - Improve code standardization and maintainability * feat: Add Redis auto-initialization with fallback strategies - Add auto_initialize_redis() with config/env/local fallback - Move Redis logic from dispatcher_monitor to redis_service - Update base_scheduler to use auto initialization - Add proper resource cleanup and error handling * feat: add database connection management to ORM module - Add MySQL engine loading from environment variables in BaseDBManager - Add Redis connection loading from environment variables in BaseDBManager - Enhance database configuration validation and error handling - Complete database adapter infrastructure for ORM module - Provide unified database connection management interface This update provides comprehensive database connection management capabilities for the mem_scheduler module, supporting dynamic MySQL and Redis configuration loading from environment variables, establishing reliable data persistence foundation for scheduling services and API services. * remove part of test * feat: add Redis-based ORM with multiprocess synchronization - Add RedisDBManager and RedisLockableORM classes - Implement atomic locking mechanism for concurrent access - Add merge functionality for different object types - Include comprehensive test suite and examples - Fix Redis key type conflicts in lock operations * fix: resolve scheduler module import and Redis integration issues * revise naive memcube creation in server router * remove long-time tests in test_scheduler * remove redis test which needs .env * refactor all codes about mixture search with scheduler * fix: resolve Redis API synchronization issues and implement search API with reranker - Fix running_entries to running_task_ids migration across codebase - Update sync_search_data method to properly handle TaskRunningStatus - Correct variable naming and logic in API synchronization flow - Implement search API endpoint with reranker functionality - Update test files to reflect new running_task_ids convention - Ensure proper Redis state management for concurrent tasks * remove a test for api module * revise to pass the test suite * address some bugs to make mix_search normally running * modify codes according to evaluation logs * feat: Optimize mixture search and enhance API client * feat: Add conversation_turn tracking for session-based memory search - Add conversation_turn field to APIMemoryHistoryEntryItem schema with default value 0 - Implement session counter in OptimizedScheduler to track turn count per session_id - Update sync_search_data method to accept and store conversation_turn parameter - Maintain session history with LRU eviction (max 5 sessions) - Rename conversation_id to session_id for consistency with request object - Enable direct access to session_id from search requests This feature allows tracking conversation turns within the same session, providing better context for memory retrieval and search history management. * adress time bug in monitor * revise simple tree * add mode to evaluation client; rewrite print to logger.info in db files * feat: 1. add redis queue for scheduler 2. finish the code related to mix search and fine search * debug the working memory code * addressed a range of bugs to make scheduler running correctly * remove test_dispatch_parallel test * print change to logger.info * adjucted the core code related to fine and mixture apis * feat: create task queue to wrap local queue and redis queue. queue now split FIFO to multi queue from different users. addressed a range of bugs * fix bugs: debug bugs about internet trigger * debug get searcher mode * feat: add manual internet * Fix: fix code format * feat: add strategy for fine search * debug redis queue * debug redis queue * fix bugs: completely addressed bugs about redis queue * refactor: add searcher to handler_init; remove info log from task_queue * refactor: modify analyzer * refactor: revise locomo_eval to make it support llm other than gpt-4o-mini * feat: develop advanced searcher with deep search * feat: finish a complete version of deep search * refactor: refactor deep search feature, now only allowing one-round deep search * feat: implement the feature of get_tasks_status, but completed tasks are not recorded yet; waiting to be developed * debuging merged code; searching memories have bugs * change logging level * debug api evaluation * fix bugs: change top to top_k * change log * refactor: rewrite deep search to make it work better * change num_users * feat: developed and test task broker and orchestrator * Fix: Include task_id in ScheduleMessageItem serialization * Fix(Scheduler): Correct event log creation and task_id serialization * Feat(Scheduler): Add conditional detailed logging for KB updates Fix(Scheduler): Correct create_event_log indentation * Fix(Scheduler): Correct create_event_log call sites Reverts previous incorrect fix to scheduler_logger.py and correctly fixes the TypeError at the call sites in general_scheduler.py by removing the invalid 'log_content' kwarg and adding the missing memory_type kwargs. * Fix(Scheduler): Deserialize task_id in ScheduleMessageItem.from_dict This completes the fix for the task_id loss. The 'to_dict' method was previously fixed to serialize the task_id, but the corresponding 'from_dict' method was not updated to deserialize it, causing the value to be lost when messages were read from the queue. * Refactor(Config): Centralize RabbitMQ config override logic Moves all environment variable override logic into initialize_rabbitmq for a single source of truth. This ensures Nacos-provided environment variables for all RabbitMQ settings are respected over file configurations. Also removes now-redundant logging from the publish method. * Revert "Refactor(Config): Centralize RabbitMQ config override logic" This reverts commit b8cc42a. * Fix(Redis): Convert None task_id to empty string during serialization Resolves DataError in Redis Streams when task_id is None by ensuring it's serialized as an empty string instead of None, which Redis does not support. Applies to ScheduleMessageItem.to_dict method. * Feat(Log): Add diagnostic log to /product/add endpoint Adds an INFO level diagnostic log message at the beginning of the create_memory function to help verify code deployment. * Feat(Log): Add comprehensive diagnostic logs for /product/add flow Introduces detailed INFO level diagnostic logs across the entire call chain for the /product/add API endpoint. These logs include relevant context, such as full request bodies, message items before scheduler submission, and messages before RabbitMQ publication, to aid in debugging deployment discrepancies and tracing data flow, especially concerning task_id propagation. Logs added/enhanced in: - src/memos/api/routers/product_router.py - src/memos/api/handlers/add_handler.py - src/memos/multi_mem_cube/single_cube.py - src/memos/mem_os/core.py - src/memos/mem_scheduler/general_scheduler.py - src/memos/mem_scheduler/base_scheduler.py - src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py * Feat(Log): Add comprehensive diagnostic logs for /product/add flow and apply ruff formatting Introduces detailed INFO level diagnostic logs across the entire call chain for the /product/add API endpoint. These logs include relevant context, such as full request bodies, message items before scheduler submission, and messages before RabbitMQ publication, to aid in debugging deployment discrepancies and tracing data flow, especially concerning task_id propagation. Also applies automatic code formatting using ruff format to all modified files. Logs added/enhanced in: - src/memos/api/routers/product_router.py - src/memos/api/handlers/add_handler.py - src/memos/multi_mem_cube/single_cube.py - src/memos/mem_os/core.py - src/memos/mem_scheduler/general_scheduler.py - src/memos/mem_scheduler/base_scheduler.py - src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py * Fix(rabbitmq): Use env vars for KB updates and improve logging * Fix(rabbitmq): Explicitly use MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME and empty routing key for KB updates * Fix(add_handler): Update diagnostic log timestamp * Fix(add_handler): Update diagnostic log timestamp again (auto-updated) * Update default scheduler redis stream prefix * Update diagnostic timestamp in add handler * Allow optional log_content in scheduler event log * feat: new examples to test scheduelr * feat: fair scheduler and refactor of search function * fix bugs: address bugs caused by outdated test code * feat: add task_schedule_monitor * fix: handle nil mem_cube in scheduler message consumers * fix bugs: response messaged changed in memos code * refactor: revise task queue to allow it dealing with pending tasks when no task remaining * refactor: revise mixture search and scheduler logger * Fix scheduler task tracking * fix bugs: address ai review issues * fix bugs: address rabbitmq initialization failed when doing pytest * fix(scheduler): Correct dispatcher task and future tracking * Remove dump.rdb * fix bugs: revised message ack logics; refactor add log function * fix bugs: change Chinese notation to English * fix indent error in logger * fix bugs: addressed the issues caused by multiprocessing codes obtain same pending tasks * addMemory/updateMemory log * fix bugs: modify redis queue logics to make it run as expected * feat: add a default mem cube initialization for scheduler * address scheduler init bug * feat(scheduler): Propagate trace_id across process boundaries for mem… (#592) feat(scheduler): Propagate trace_id across process boundaries for mem_scheduler logs This commit addresses the issue where 'trace_id' was missing from logs generated by the 'mem_scheduler' module, especially when tasks were executed in separate processes. The changes implement a manual propagation of 'trace_id' from the message producer to the consumer: 1. **Schema Update**: Added an optional 'trace_id' field to 'ScheduleMessageItem' in 'src/memos/mem_scheduler/schemas/message_schemas.py' to allow 'trace_id' to be carried within messages. 2. **Producer-side Capture**: Modified 'src/memos/mem_scheduler/task_schedule_modules/task_queue.py' to capture the current 'trace_id' and embed it into the 'ScheduleMessageItem' before messages are enqueued. 3. **Consumer-side Context Re-establishment**: Updated 'src/memos/mem_scheduler/task_schedule_modules/dispatcher.py' to extract the 'trace_id' from incoming messages and re-establish the logging context using 'RequestContext' for each task's execution. This ensures all logs within a task's scope correctly include its associated 'trace_id', even when crossing process boundaries. This approach ensures robust and accurate tracing of tasks within the scheduler, enhancing observability and debugging capabilities. Co-authored-by: [email protected] <> * fix bugs: redis queue allows to reget pending tasks which exceeding idle time * fix(scheduler): Correct lazy-loading logic for mem_cube property * Add MONITOR_EVENT logs for scheduler lifecycle * fix: Resolve Ruff linting and formatting issues * Handle dequeue timestamp without pydantic errors * feat: orchestrator add task priority; move task labels into task_schemas; add synchronous execuation option in dispatcher * feat: more logs for debug * fix bugs: addresss some bugs * refactor: remove logger info in pref add function --------- Co-authored-by: fridayL <[email protected]> Co-authored-by: [email protected] <> Co-authored-by: Zehao Lin <[email protected]>
1 parent b327ea7 commit 3f99afd

File tree

22 files changed

+296
-281
lines changed

22 files changed

+296
-281
lines changed

examples/mem_scheduler/memos_w_scheduler.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@
1313
from memos.mem_cube.general import GeneralMemCube
1414
from memos.mem_os.main import MOS
1515
from memos.mem_scheduler.general_scheduler import GeneralScheduler
16-
from memos.mem_scheduler.schemas.general_schemas import (
17-
ADD_LABEL,
18-
ANSWER_LABEL,
19-
MEM_ARCHIVE_LABEL,
20-
MEM_ORGANIZE_LABEL,
21-
MEM_UPDATE_LABEL,
22-
QUERY_LABEL,
23-
)
2416
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem
17+
from memos.mem_scheduler.schemas.task_schemas import (
18+
ADD_TASK_LABEL,
19+
ANSWER_TASK_LABEL,
20+
MEM_ARCHIVE_TASK_LABEL,
21+
MEM_ORGANIZE_TASK_LABEL,
22+
MEM_UPDATE_TASK_LABEL,
23+
QUERY_TASK_LABEL,
24+
)
2525
from memos.mem_scheduler.utils.filter_utils import transform_name_to_key
2626

2727

@@ -118,24 +118,24 @@ def _first_content() -> str:
118118
return memcube_content[0].get("content", "") or content
119119
return content
120120

121-
if label in ("addMessage", QUERY_LABEL, ANSWER_LABEL):
121+
if label in ("addMessage", QUERY_TASK_LABEL, ANSWER_TASK_LABEL):
122122
target_cube = cube_display.replace("MemCube", "")
123123
title = _format_title(item.timestamp, f"addMessages to {target_cube} MemCube")
124124
return title, _truncate_with_rules(_first_content())
125125

126-
if label in ("addMemory", ADD_LABEL):
126+
if label in ("addMemory", ADD_TASK_LABEL):
127127
title = _format_title(item.timestamp, f"{cube_display} added {memory_len} memories")
128128
return title, _truncate_with_rules(_first_content())
129129

130-
if label in ("updateMemory", MEM_UPDATE_LABEL):
130+
if label in ("updateMemory", MEM_UPDATE_TASK_LABEL):
131131
title = _format_title(item.timestamp, f"{cube_display} updated {memory_len} memories")
132132
return title, _truncate_with_rules(_first_content())
133133

134-
if label in ("archiveMemory", MEM_ARCHIVE_LABEL):
134+
if label in ("archiveMemory", MEM_ARCHIVE_TASK_LABEL):
135135
title = _format_title(item.timestamp, f"{cube_display} archived {memory_len} memories")
136136
return title, _truncate_with_rules(_first_content())
137137

138-
if label in ("mergeMemory", MEM_ORGANIZE_LABEL):
138+
if label in ("mergeMemory", MEM_ORGANIZE_TASK_LABEL):
139139
title = _format_title(item.timestamp, f"{cube_display} merged {memory_len} memories")
140140
merged = [c for c in memcube_content if c.get("type") == "merged"]
141141
post = [c for c in memcube_content if c.get("type") == "postMerge"]

examples/mem_scheduler/redis_example.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from memos.configs.mem_scheduler import SchedulerConfigFactory
1010
from memos.mem_cube.general import GeneralMemCube
1111
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
12-
from memos.mem_scheduler.schemas.general_schemas import QUERY_LABEL
1312
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
13+
from memos.mem_scheduler.schemas.task_schemas import QUERY_TASK_LABEL
1414

1515

1616
if TYPE_CHECKING:
@@ -55,7 +55,7 @@ def service_run():
5555
message_item = ScheduleMessageItem(
5656
user_id=user_id,
5757
mem_cube_id="mem_cube_2",
58-
label=QUERY_LABEL,
58+
label=QUERY_TASK_LABEL,
5959
mem_cube=mem_cube,
6060
content=query,
6161
timestamp=datetime.now(),

examples/mem_scheduler/try_schedule_modules.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from memos.mem_cube.general import GeneralMemCube
1515
from memos.mem_scheduler.analyzer.mos_for_test_scheduler import MOSForTestScheduler
1616
from memos.mem_scheduler.general_scheduler import GeneralScheduler
17-
from memos.mem_scheduler.schemas.general_schemas import (
17+
from memos.mem_scheduler.schemas.task_schemas import (
1818
NOT_APPLICABLE_TYPE,
1919
)
2020

src/memos/api/handlers/chat_handler.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
prepare_reference_data,
3131
process_streaming_references_complete,
3232
)
33-
from memos.mem_scheduler.schemas.general_schemas import (
34-
ANSWER_LABEL,
35-
QUERY_LABEL,
36-
)
3733
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
34+
from memos.mem_scheduler.schemas.task_schemas import (
35+
ANSWER_TASK_LABEL,
36+
QUERY_TASK_LABEL,
37+
)
3838
from memos.templates.mos_prompts import (
3939
FURTHER_SUGGESTION_PROMPT,
4040
get_memos_prompt,
@@ -244,7 +244,7 @@ def generate_chat_response() -> Generator[str, None, None]:
244244
user_id=chat_req.user_id,
245245
mem_cube_id=scheduler_cube_id,
246246
query=chat_req.query,
247-
label=QUERY_LABEL,
247+
label=QUERY_TASK_LABEL,
248248
)
249249
# Extract memories from search results
250250
memories_list = []
@@ -423,7 +423,7 @@ def generate_chat_response() -> Generator[str, None, None]:
423423
user_id=chat_req.user_id,
424424
mem_cube_id=scheduler_cube_id,
425425
query=chat_req.query,
426-
label=QUERY_LABEL,
426+
label=QUERY_TASK_LABEL,
427427
)
428428
# Extract memories from search results
429429
memories_list = []
@@ -1034,7 +1034,7 @@ async def _post_chat_processing(
10341034

10351035
# Send answer to scheduler
10361036
self._send_message_to_scheduler(
1037-
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_LABEL
1037+
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_TASK_LABEL
10381038
)
10391039

10401040
self.logger.info(f"Post-chat processing completed for user {user_id}")

src/memos/configs/mem_reader.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime
22
from typing import Any, ClassVar
33

4-
from pydantic import Field, field_validator, model_validator
4+
from pydantic import ConfigDict, Field, field_validator, model_validator
55

66
from memos.configs.base import BaseConfig
77
from memos.configs.chunker import ChunkerConfigFactory
@@ -44,6 +44,9 @@ def parse_datetime(cls, value):
4444
class SimpleStructMemReaderConfig(BaseMemReaderConfig):
4545
"""SimpleStruct MemReader configuration class."""
4646

47+
# Allow passing additional fields without raising validation errors
48+
model_config = ConfigDict(extra="allow", strict=True)
49+
4750

4851
class MultiModalStructMemReaderConfig(BaseMemReaderConfig):
4952
"""MultiModalStruct MemReader configuration class."""

src/memos/mem_os/core.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
from memos.mem_reader.factory import MemReaderFactory
1616
from memos.mem_scheduler.general_scheduler import GeneralScheduler
1717
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
18-
from memos.mem_scheduler.schemas.general_schemas import (
19-
ADD_LABEL,
20-
ANSWER_LABEL,
21-
MEM_READ_LABEL,
22-
PREF_ADD_LABEL,
23-
QUERY_LABEL,
24-
)
2518
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
19+
from memos.mem_scheduler.schemas.task_schemas import (
20+
ADD_TASK_LABEL,
21+
ANSWER_TASK_LABEL,
22+
MEM_READ_TASK_LABEL,
23+
PREF_ADD_TASK_LABEL,
24+
QUERY_TASK_LABEL,
25+
)
2626
from memos.mem_user.user_manager import UserManager, UserRole
2727
from memos.memories.activation.item import ActivationMemoryItem
2828
from memos.memories.parametric.item import ParametricMemoryItem
@@ -283,7 +283,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None =
283283
message_item = ScheduleMessageItem(
284284
user_id=target_user_id,
285285
mem_cube_id=mem_cube_id,
286-
label=QUERY_LABEL,
286+
label=QUERY_TASK_LABEL,
287287
content=query,
288288
timestamp=datetime.utcnow(),
289289
)
@@ -343,7 +343,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None =
343343
message_item = ScheduleMessageItem(
344344
user_id=target_user_id,
345345
mem_cube_id=mem_cube_id,
346-
label=ANSWER_LABEL,
346+
label=ANSWER_TASK_LABEL,
347347
content=response,
348348
timestamp=datetime.utcnow(),
349349
)
@@ -771,7 +771,7 @@ def process_textual_memory():
771771
message_item = ScheduleMessageItem(
772772
user_id=target_user_id,
773773
mem_cube_id=mem_cube_id,
774-
label=MEM_READ_LABEL,
774+
label=MEM_READ_TASK_LABEL,
775775
content=json.dumps(mem_ids),
776776
timestamp=datetime.utcnow(),
777777
task_id=task_id,
@@ -783,7 +783,7 @@ def process_textual_memory():
783783
message_item = ScheduleMessageItem(
784784
user_id=target_user_id,
785785
mem_cube_id=mem_cube_id,
786-
label=ADD_LABEL,
786+
label=ADD_TASK_LABEL,
787787
content=json.dumps(mem_ids),
788788
timestamp=datetime.utcnow(),
789789
task_id=task_id,
@@ -824,7 +824,7 @@ def process_preference_memory():
824824
user_id=target_user_id,
825825
session_id=target_session_id,
826826
mem_cube_id=mem_cube_id,
827-
label=PREF_ADD_LABEL,
827+
label=PREF_ADD_TASK_LABEL,
828828
content=json.dumps(messages_list),
829829
timestamp=datetime.utcnow(),
830830
)
@@ -878,7 +878,7 @@ def process_preference_memory():
878878
message_item = ScheduleMessageItem(
879879
user_id=target_user_id,
880880
mem_cube_id=mem_cube_id,
881-
label=MEM_READ_LABEL,
881+
label=MEM_READ_TASK_LABEL,
882882
content=json.dumps(mem_ids),
883883
timestamp=datetime.utcnow(),
884884
)
@@ -889,7 +889,7 @@ def process_preference_memory():
889889
message_item = ScheduleMessageItem(
890890
user_id=target_user_id,
891891
mem_cube_id=mem_cube_id,
892-
label=ADD_LABEL,
892+
label=ADD_TASK_LABEL,
893893
content=json.dumps(mem_ids),
894894
timestamp=datetime.utcnow(),
895895
)
@@ -920,7 +920,7 @@ def process_preference_memory():
920920
message_item = ScheduleMessageItem(
921921
user_id=target_user_id,
922922
mem_cube_id=mem_cube_id,
923-
label=ADD_LABEL,
923+
label=ADD_TASK_LABEL,
924924
content=json.dumps(mem_ids),
925925
timestamp=datetime.utcnow(),
926926
)

src/memos/mem_os/product.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
prepare_reference_data,
3030
process_streaming_references_complete,
3131
)
32-
from memos.mem_scheduler.schemas.general_schemas import (
33-
ANSWER_LABEL,
34-
QUERY_LABEL,
35-
)
3632
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
33+
from memos.mem_scheduler.schemas.task_schemas import (
34+
ANSWER_TASK_LABEL,
35+
QUERY_TASK_LABEL,
36+
)
3737
from memos.mem_user.persistent_factory import PersistentUserManagerFactory
3838
from memos.mem_user.user_manager import UserRole
3939
from memos.memories.textual.item import (
@@ -710,7 +710,7 @@ async def _post_chat_processing(
710710
logger.warning(f"Failed to send chat notification (async): {e}")
711711

712712
self._send_message_to_scheduler(
713-
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_LABEL
713+
user_id=user_id, mem_cube_id=cube_id, query=clean_response, label=ANSWER_TASK_LABEL
714714
)
715715

716716
self.add(
@@ -1151,7 +1151,7 @@ def chat_with_references(
11511151
f"time chat: search text_mem time user_id: {user_id} time is: {search_time_end - time_start}"
11521152
)
11531153
self._send_message_to_scheduler(
1154-
user_id=user_id, mem_cube_id=cube_id, query=query, label=QUERY_LABEL
1154+
user_id=user_id, mem_cube_id=cube_id, query=query, label=QUERY_TASK_LABEL
11551155
)
11561156
if memories_result:
11571157
memories_list = memories_result[0]["memories"]

src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
from memos.log import get_logger
55
from memos.mem_os.main import MOS
66
from memos.mem_scheduler.schemas.general_schemas import (
7-
ANSWER_LABEL,
87
MONITOR_WORKING_MEMORY_TYPE,
9-
QUERY_LABEL,
108
)
119
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
10+
from memos.mem_scheduler.schemas.task_schemas import (
11+
ANSWER_TASK_LABEL,
12+
QUERY_TASK_LABEL,
13+
)
1214

1315

1416
logger = get_logger(__name__)
@@ -427,7 +429,7 @@ def chat(self, query: str, user_id: str | None = None) -> str:
427429
message_item = ScheduleMessageItem(
428430
user_id=target_user_id,
429431
mem_cube_id=mem_cube_id,
430-
label=QUERY_LABEL,
432+
label=QUERY_TASK_LABEL,
431433
content=query,
432434
timestamp=datetime.now(),
433435
)
@@ -517,7 +519,7 @@ def chat(self, query: str, user_id: str | None = None) -> str:
517519
message_item = ScheduleMessageItem(
518520
user_id=target_user_id,
519521
mem_cube_id=mem_cube_id,
520-
label=ANSWER_LABEL,
522+
label=ANSWER_TASK_LABEL,
521523
content=response,
522524
timestamp=datetime.now(),
523525
)

src/memos/mem_scheduler/analyzer/scheduler_for_eval.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88
from memos.log import get_logger
99
from memos.mem_scheduler.general_scheduler import GeneralScheduler
10-
from memos.mem_scheduler.schemas.general_schemas import (
10+
from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem
11+
from memos.mem_scheduler.schemas.task_schemas import (
1112
DEFAULT_MAX_QUERY_KEY_WORDS,
1213
)
13-
from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem
1414

1515

1616
if TYPE_CHECKING:

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
)
4444
from memos.mem_scheduler.schemas.monitor_schemas import MemoryMonitorItem
4545
from memos.mem_scheduler.task_schedule_modules.dispatcher import SchedulerDispatcher
46+
from memos.mem_scheduler.task_schedule_modules.orchestrator import SchedulerOrchestrator
4647
from memos.mem_scheduler.task_schedule_modules.task_queue import ScheduleTaskQueue
4748
from memos.mem_scheduler.utils import metrics
4849
from memos.mem_scheduler.utils.db_utils import get_utc_now
@@ -121,10 +122,12 @@ def __init__(self, config: BaseSchedulerConfig):
121122
self.max_internal_message_queue_size = self.config.get(
122123
"max_internal_message_queue_size", DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE
123124
)
125+
self.orchestrator = SchedulerOrchestrator()
124126
self.memos_message_queue = ScheduleTaskQueue(
125127
use_redis_queue=self.use_redis_queue,
126128
maxsize=self.max_internal_message_queue_size,
127129
disabled_handlers=self.disabled_handlers,
130+
orchestrator=self.orchestrator,
128131
)
129132
self.searcher: Searcher | None = None
130133
self.retriever: SchedulerRetriever | None = None
@@ -143,6 +146,7 @@ def __init__(self, config: BaseSchedulerConfig):
143146
status_tracker=self.status_tracker,
144147
metrics=self.metrics,
145148
submit_web_logs=self._submit_web_logs,
149+
orchestrator=self.orchestrator,
146150
)
147151
# Task schedule monitor: initialize with underlying queue implementation
148152
self.get_status_parallel = self.config.get("get_status_parallel", True)
@@ -697,22 +701,22 @@ def get_web_log_messages(self) -> list[dict]:
697701
break
698702

699703
def _map_label(label: str) -> str:
700-
from memos.mem_scheduler.schemas.general_schemas import (
701-
ADD_LABEL,
702-
ANSWER_LABEL,
703-
MEM_ARCHIVE_LABEL,
704-
MEM_ORGANIZE_LABEL,
705-
MEM_UPDATE_LABEL,
706-
QUERY_LABEL,
704+
from memos.mem_scheduler.schemas.task_schemas import (
705+
ADD_TASK_LABEL,
706+
ANSWER_TASK_LABEL,
707+
MEM_ARCHIVE_TASK_LABEL,
708+
MEM_ORGANIZE_TASK_LABEL,
709+
MEM_UPDATE_TASK_LABEL,
710+
QUERY_TASK_LABEL,
707711
)
708712

709713
mapping = {
710-
QUERY_LABEL: "addMessage",
711-
ANSWER_LABEL: "addMessage",
712-
ADD_LABEL: "addMemory",
713-
MEM_UPDATE_LABEL: "updateMemory",
714-
MEM_ORGANIZE_LABEL: "mergeMemory",
715-
MEM_ARCHIVE_LABEL: "archiveMemory",
714+
QUERY_TASK_LABEL: "addMessage",
715+
ANSWER_TASK_LABEL: "addMessage",
716+
ADD_TASK_LABEL: "addMemory",
717+
MEM_UPDATE_TASK_LABEL: "updateMemory",
718+
MEM_ORGANIZE_TASK_LABEL: "mergeMemory",
719+
MEM_ARCHIVE_TASK_LABEL: "archiveMemory",
716720
}
717721
return mapping.get(label, label)
718722

@@ -785,7 +789,7 @@ def _message_consumer(self) -> None:
785789
if enqueue_epoch is not None:
786790
queue_wait_ms = max(0.0, now - enqueue_epoch) * 1000
787791

788-
# Avoid pydantic attribute enforcement
792+
# Avoid pydantic field enforcement by using object.__setattr__
789793
object.__setattr__(msg, "_dequeue_ts", now)
790794
emit_monitor_event(
791795
"dequeue",

0 commit comments

Comments
 (0)