Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
011d248
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
8c8d672
feat: Optimize mixture search and enhance API client
tangg555 Oct 28, 2025
aabad8d
feat: Add conversation_turn tracking for session-based memory search
tangg555 Oct 28, 2025
3faa5c3
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 28, 2025
c6376cd
adress time bug in monitor
tangg555 Oct 29, 2025
bd0b234
revise simple tree
tangg555 Oct 29, 2025
5332d12
add mode to evaluation client; rewrite print to logger.info in db files
tangg555 Oct 29, 2025
aee13ba
feat: 1. add redis queue for scheduler 2. finish the code related to …
tangg555 Nov 5, 2025
f957967
debug the working memory code
tangg555 Nov 5, 2025
f520cca
addressed conflicts to merge
tangg555 Nov 5, 2025
a3f6636
addressed a range of bugs to make scheduler running correctly
tangg555 Nov 5, 2025
47e9851
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Nov 5, 2025
161af12
remove test_dispatch_parallel test
tangg555 Nov 5, 2025
1d8d14b
print change to logger.info
tangg555 Nov 5, 2025
00e3a75
addressed conflicts
tangg555 Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/mem_scheduler/api_w_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from memos.api.routers.server_router import mem_scheduler
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem


# Debug: Print scheduler configuration
print("=== Scheduler Configuration Debug ===")
print(f"Scheduler type: {type(mem_scheduler).__name__}")
print(f"Config: {mem_scheduler.config}")
print(f"use_redis_queue: {mem_scheduler.use_redis_queue}")
print(f"Queue type: {type(mem_scheduler.memos_message_queue).__name__}")
print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}")

# Check if Redis queue is connected
if hasattr(mem_scheduler.memos_message_queue, "_is_connected"):
print(f"Redis connected: {mem_scheduler.memos_message_queue._is_connected}")
if hasattr(mem_scheduler.memos_message_queue, "_redis_conn"):
print(f"Redis connection: {mem_scheduler.memos_message_queue._redis_conn}")
print("=====================================\n")

queue = mem_scheduler.memos_message_queue
queue.clear()


# 1. Define a handler function
def my_test_handler(messages: list[ScheduleMessageItem]):
print(f"My test handler received {len(messages)} messages:")
for msg in messages:
print(f" my_test_handler - {msg.item_id}: {msg.content}")
print(
f"{queue._redis_conn.xinfo_groups(queue.stream_name)} qsize: {queue.qsize()} messages:{messages}"
)


# 2. Register the handler
TEST_HANDLER_LABEL = "test_handler"
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})

# 3. Create messages
messages_to_send = [
ScheduleMessageItem(
item_id=f"test_item_{i}",
user_id="test_user",
mem_cube_id="test_mem_cube",
label=TEST_HANDLER_LABEL,
content=f"This is test message {i}",
)
for i in range(5)
]

# 5. Submit messages
for mes in messages_to_send:
print(f"Submitting message {mes.item_id} to the scheduler...")
mem_scheduler.submit_messages([mes])

# 6. Wait for messages to be processed (limited to 100 checks)
print("Waiting for messages to be consumed (max 100 checks)...")
mem_scheduler.mem_scheduler_wait()


# 7. Stop the scheduler
print("Stopping the scheduler...")
mem_scheduler.stop()
85 changes: 0 additions & 85 deletions examples/mem_scheduler/memos_w_optimized_scheduler.py

This file was deleted.

87 changes: 0 additions & 87 deletions examples/mem_scheduler/memos_w_optimized_scheduler_for_test.py

This file was deleted.

73 changes: 37 additions & 36 deletions examples/mem_scheduler/memos_w_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,48 @@ def init_task():
return conversations, questions


def show_web_logs(mem_scheduler: GeneralScheduler):
"""Display all web log entries from the scheduler's log queue.

Args:
mem_scheduler: The scheduler instance containing web logs to display
"""
if mem_scheduler._web_log_message_queue.empty():
print("Web log queue is currently empty.")
return

print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)

# Create a temporary queue to preserve the original queue contents
temp_queue = Queue()
log_count = 0

while not mem_scheduler._web_log_message_queue.empty():
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
temp_queue.put(log_item)
log_count += 1

# Print log entry details
print(f"\nLog Entry #{log_count}:")
print(f'- "{log_item.label}" log: {log_item}')

print("-" * 50)

# Restore items back to the original queue
while not temp_queue.empty():
mem_scheduler._web_log_message_queue.put(temp_queue.get())

print(f"\nTotal {log_count} web log entries displayed.")
print("=" * 110 + "\n")


def run_with_scheduler_init():
print("==== run_with_automatic_scheduler_init ====")
conversations, questions = init_task()

# set configs
mos_config = MOSConfig.from_yaml_file(
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml"
f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_optimized_scheduler.yaml"
)

mem_cube_config = GeneralMemCubeConfig.from_yaml_file(
Expand Down Expand Up @@ -118,6 +153,7 @@ def run_with_scheduler_init():
)

mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id)
mos.mem_scheduler.current_mem_cube = mem_cube

for item in questions:
print("===== Chat Start =====")
Expand All @@ -131,40 +167,5 @@ def run_with_scheduler_init():
mos.mem_scheduler.stop()


def show_web_logs(mem_scheduler: GeneralScheduler):
"""Display all web log entries from the scheduler's log queue.

Args:
mem_scheduler: The scheduler instance containing web logs to display
"""
if mem_scheduler._web_log_message_queue.empty():
print("Web log queue is currently empty.")
return

print("\n" + "=" * 50 + " WEB LOGS " + "=" * 50)

# Create a temporary queue to preserve the original queue contents
temp_queue = Queue()
log_count = 0

while not mem_scheduler._web_log_message_queue.empty():
log_item: ScheduleLogForWebItem = mem_scheduler._web_log_message_queue.get()
temp_queue.put(log_item)
log_count += 1

# Print log entry details
print(f"\nLog Entry #{log_count}:")
print(f'- "{log_item.label}" log: {log_item}')

print("-" * 50)

# Restore items back to the original queue
while not temp_queue.empty():
mem_scheduler._web_log_message_queue.put(temp_queue.get())

print(f"\nTotal {log_count} web log entries displayed.")
print("=" * 110 + "\n")


if __name__ == "__main__":
run_with_scheduler_init()
Loading
Loading