Skip to content

Commit 7ee9f36

Browse files
authored
Merge branch 'dev' into feat/file_parser_debug
2 parents 4cef2c3 + 69bad38 commit 7ee9f36

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1213
-849
lines changed

evaluation/scripts/locomo/locomo_eval.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ async def main(frame, version="default", options=None, num_runs=1, max_workers=4
311311
with open(response_path) as file:
312312
locomo_responses = json.load(file)
313313

314-
num_users = 2
314+
num_users = 10
315315
all_grades = {}
316316

317317
total_responses_count = sum(

evaluation/scripts/utils/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ def search(self, query, user_id, top_k):
189189
)
190190
response = requests.request("POST", url, data=payload, headers=self.headers)
191191
assert response.status_code == 200, response.text
192-
assert json.loads(response.text)["message"] == "Memory searched successfully", response.text
192+
assert json.loads(response.text)["message"] == "Search completed successfully", (
193+
response.text
194+
)
193195
return json.loads(response.text)["data"]
194196

195197

examples/data/config/mem_scheduler/general_scheduler_config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ config:
44
act_mem_update_interval: 30
55
context_window_size: 10
66
thread_pool_max_workers: 5
7-
consume_interval_seconds: 1
7+
consume_interval_seconds: 0.01
88
working_mem_monitor_capacity: 20
99
activation_mem_monitor_capacity: 5
1010
enable_parallel_dispatch: true

examples/data/config/mem_scheduler/memos_config_w_optimized_scheduler.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ mem_scheduler:
3838
act_mem_update_interval: 30
3939
context_window_size: 10
4040
thread_pool_max_workers: 10
41-
consume_interval_seconds: 1
41+
consume_interval_seconds: 0.01
4242
working_mem_monitor_capacity: 20
4343
activation_mem_monitor_capacity: 5
4444
enable_parallel_dispatch: true

examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ mem_scheduler:
3838
act_mem_update_interval: 30
3939
context_window_size: 10
4040
thread_pool_max_workers: 10
41-
consume_interval_seconds: 1
41+
consume_interval_seconds: 0.01
4242
working_mem_monitor_capacity: 20
4343
activation_mem_monitor_capacity: 5
4444
enable_parallel_dispatch: true
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from memos.chunkers import ChunkerFactory
2+
from memos.configs.chunker import ChunkerConfigFactory
3+
4+
5+
config = ChunkerConfigFactory.model_validate(
6+
{
7+
"backend": "markdown",
8+
"config": {
9+
"chunk_size": 1000,
10+
"chunk_overlap": 100,
11+
"recursive": True,
12+
},
13+
}
14+
)
15+
16+
chunker = ChunkerFactory.from_config(config)
17+
18+
text = """
19+
# Header 1
20+
This is the first sentence. This is the second sentence.
21+
And here's a third one with some additional context.
22+
23+
# Header 2
24+
This is the fourth sentence. This is the fifth sentence.
25+
And here's a sixth one with some additional context.
26+
27+
# Header 3
28+
This is the seventh sentence. This is the eighth sentence.
29+
And here's a ninth one with some additional context.
30+
"""
31+
chunks = chunker.chunk(text)
32+
for chunk in chunks:
33+
print("doc:", chunk)

examples/mem_scheduler/api_w_scheduler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}")
1818
print("=====================================\n")
1919

20-
mem_scheduler.memos_message_queue.debug_mode_on()
2120
queue = mem_scheduler.memos_message_queue
2221
queue.clear()
2322

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import sys
2+
3+
from collections import defaultdict
4+
from pathlib import Path
5+
6+
from memos.api.routers.server_router import mem_scheduler
7+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
8+
9+
10+
FILE_PATH = Path(__file__).absolute()
11+
BASE_DIR = FILE_PATH.parent.parent.parent
12+
sys.path.insert(0, str(BASE_DIR))
13+
14+
15+
def make_message(user_id: str, mem_cube_id: str, label: str, idx: int | str) -> ScheduleMessageItem:
16+
return ScheduleMessageItem(
17+
item_id=f"{user_id}:{mem_cube_id}:{label}:{idx}",
18+
user_id=user_id,
19+
mem_cube_id=mem_cube_id,
20+
label=label,
21+
content=f"msg-{idx} for {user_id}/{mem_cube_id}/{label}",
22+
)
23+
24+
25+
def seed_messages_for_test_fairness(queue, combos, per_stream):
26+
# send overwhelm message by one user
27+
(u, c, label) = combos[0]
28+
task_target = 100
29+
print(f"{u}:{c}:{label} submit {task_target} messages")
30+
for i in range(task_target):
31+
msg = make_message(u, c, label, f"overwhelm_{i}")
32+
queue.submit_messages(msg)
33+
34+
for u, c, label in combos:
35+
print(f"{u}:{c}:{label} submit {per_stream} messages")
36+
for i in range(per_stream):
37+
msg = make_message(u, c, label, i)
38+
queue.submit_messages(msg)
39+
print("======= seed_messages Done ===========")
40+
41+
42+
def count_by_stream(messages):
43+
counts = defaultdict(int)
44+
for m in messages:
45+
key = f"{m.user_id}:{m.mem_cube_id}:{m.label}"
46+
counts[key] += 1
47+
return counts
48+
49+
50+
def run_fair_redis_schedule(batch_size: int = 3):
51+
print("=== Redis Fairness Demo ===")
52+
print(f"use_redis_queue: {mem_scheduler.use_redis_queue}")
53+
mem_scheduler.consume_batch = batch_size
54+
queue = mem_scheduler.memos_message_queue
55+
56+
# Isolate and clear queue
57+
queue.clear()
58+
59+
# Define multiple streams: (user_id, mem_cube_id, task_label)
60+
combos = [
61+
("u1", "u1", "labelX"),
62+
("u1", "u1", "labelY"),
63+
("u2", "u2", "labelX"),
64+
("u2", "u2", "labelY"),
65+
]
66+
per_stream = 5
67+
68+
# Seed messages evenly across streams
69+
seed_messages_for_test_fairness(queue, combos, per_stream)
70+
71+
# Compute target batch size (fair split across streams)
72+
print(f"Request batch_size={batch_size} for {len(combos)} streams")
73+
74+
for _ in range(len(combos)):
75+
# Fetch one brokered pack
76+
msgs = queue.get_messages(batch_size=batch_size)
77+
print(f"Fetched {len(msgs)} messages in first pack")
78+
79+
# Check fairness: counts per stream
80+
counts = count_by_stream(msgs)
81+
for k in sorted(counts):
82+
print(f"{k}: {counts[k]}")
83+
84+
85+
if __name__ == "__main__":
86+
# task 1 fair redis schedule
87+
run_fair_redis_schedule()
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from pathlib import Path
2+
from time import sleep
3+
4+
# Note: we skip API handler status/wait utilities in this demo
5+
from memos.api.routers.server_router import mem_scheduler
6+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
7+
8+
9+
# Debug: Print scheduler configuration
10+
print("=== Scheduler Configuration Debug ===")
11+
print(f"Scheduler type: {type(mem_scheduler).__name__}")
12+
print(f"Config: {mem_scheduler.config}")
13+
print(f"use_redis_queue: {mem_scheduler.use_redis_queue}")
14+
print(f"Queue type: {type(mem_scheduler.memos_message_queue).__name__}")
15+
print(f"Queue maxsize: {getattr(mem_scheduler.memos_message_queue, 'maxsize', 'N/A')}")
16+
print("=====================================\n")
17+
18+
queue = mem_scheduler.memos_message_queue
19+
20+
21+
# Define a handler function
22+
def my_test_handler(messages: list[ScheduleMessageItem]):
23+
print(f"My test handler received {len(messages)} messages: {[one.item_id for one in messages]}")
24+
for msg in messages:
25+
# Create a file named by task_id (use item_id as numeric id 0..99)
26+
task_id = str(msg.item_id)
27+
file_path = tmp_dir / f"{task_id}.txt"
28+
try:
29+
print(f"writing {file_path}...")
30+
file_path.write_text(f"Task {task_id} processed.\n")
31+
except Exception as e:
32+
print(f"Failed to write {file_path}: {e}")
33+
34+
35+
def submit_tasks():
36+
mem_scheduler.memos_message_queue.clear()
37+
38+
# Create 100 messages (task_id 0..99)
39+
users = ["user_A", "user_B"]
40+
messages_to_send = [
41+
ScheduleMessageItem(
42+
item_id=str(i),
43+
user_id=users[i % 2],
44+
mem_cube_id="test_mem_cube",
45+
label=TEST_HANDLER_LABEL,
46+
content=f"Create file for task {i}",
47+
)
48+
for i in range(100)
49+
]
50+
# Submit messages in batch and print completion
51+
print(f"Submitting {len(messages_to_send)} messages to the scheduler...")
52+
mem_scheduler.memos_message_queue.submit_messages(messages_to_send)
53+
print(f"Task submission done! tasks in queue: {mem_scheduler.get_tasks_status()}")
54+
55+
56+
# Register the handler
57+
TEST_HANDLER_LABEL = "test_handler"
58+
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})
59+
60+
61+
tmp_dir = Path("./tmp")
62+
tmp_dir.mkdir(exist_ok=True)
63+
64+
# Test stop-and-restart: if tmp already has >1 files, skip submission and print info
65+
existing_count = len(list(Path("tmp").glob("*.txt"))) if Path("tmp").exists() else 0
66+
if existing_count > 1:
67+
print(f"Skip submission: found {existing_count} files in tmp (>1), continue processing")
68+
else:
69+
submit_tasks()
70+
71+
# 6. Wait until tmp has 100 files or timeout
72+
poll_interval = 0.01
73+
expected = 100
74+
tmp_dir = Path("tmp")
75+
while mem_scheduler.get_tasks_status()["remaining"] != 0:
76+
count = len(list(tmp_dir.glob("*.txt"))) if tmp_dir.exists() else 0
77+
tasks_status = mem_scheduler.get_tasks_status()
78+
mem_scheduler.print_tasks_status(tasks_status=tasks_status)
79+
print(f"[Monitor] Files in tmp: {count}/{expected}")
80+
sleep(poll_interval)
81+
print(f"[Result] Final files in tmp: {len(list(tmp_dir.glob('*.txt')))})")
82+
83+
# 7. Stop the scheduler
84+
print("Stopping the scheduler...")
85+
mem_scheduler.stop()

0 commit comments

Comments
 (0)