Skip to content

Commit 127fdc7

Browse files
committed
debug redis queue
1 parent 87b5358 commit 127fdc7

File tree

3 files changed

+9
-11
lines changed

3 files changed

+9
-11
lines changed

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from memos.mem_scheduler.schemas.general_schemas import DEFAULT_STOP_WAIT
1515
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
1616
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
17-
from memos.mem_scheduler.task_schedule_modules.redis_queue import SchedulerRedisQueue
1817
from memos.mem_scheduler.utils.metrics import MetricsRegistry
1918
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
2019

@@ -152,15 +151,15 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
152151

153152
# acknowledge redis messages
154153

155-
if (
156-
self.use_redis_queue
157-
and self.memos_message_queue is not None
158-
and isinstance(self.memos_message_queue, SchedulerRedisQueue)
159-
):
154+
if self.use_redis_queue and self.memos_message_queue is not None:
160155
for msg in messages:
161156
redis_message_id = msg.redis_message_id
162157
# Acknowledge message processing
163-
self.memos_message_queue.ack_message(redis_message_id=redis_message_id)
158+
self.memos_message_queue.ack_message(
159+
user_id=msg.user_id,
160+
mem_cube_id=msg.mem_cube_id,
161+
redis_message_id=redis_message_id,
162+
)
164163

165164
# Mark task as completed and remove from tracking
166165
with self._task_lock:

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def put(
151151
raise
152152

153153
def ack_message(self, user_id, mem_cube_id, redis_message_id):
154-
stream_key = f"{self.stream_key_prefix}:{user_id}:{mem_cube_id}"
154+
stream_key = self.get_stream_key(user_id=user_id, mem_cube_id=mem_cube_id)
155155

156156
self.redis.xack(stream_key, self.consumer_group, redis_message_id)
157157

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,8 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]:
8585
# Discover all active streams via queue API
8686
streams: list[tuple[str, str]] = []
8787

88-
keys = self.get_stream_keys()
89-
for stream_key in keys:
90-
# stream_key example: "{prefix}:{user_id}:{mem_cube_id}"
88+
stream_keys = self.get_stream_keys()
89+
for stream_key in stream_keys:
9190
try:
9291
parts = stream_key.split(":")
9392
if len(parts) >= 3:

0 commit comments

Comments
 (0)