Skip to content

Commit 0911ced

Browse files
committed
debug redis queue
1 parent 127fdc7 commit 0911ced

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

src/memos/mem_scheduler/task_schedule_modules/redis_queue.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def put(
150150
logger.error(f"Failed to add message to Redis queue: {e}")
151151
raise
152152

153-
def ack_message(self, user_id, mem_cube_id, redis_message_id):
153+
def ack_message(self, user_id, mem_cube_id, redis_message_id) -> None:
154154
stream_key = self.get_stream_key(user_id=user_id, mem_cube_id=mem_cube_id)
155155

156156
self.redis.xack(stream_key, self.consumer_group, redis_message_id)
@@ -296,7 +296,13 @@ def get_stream_keys(self) -> list[str]:
296296
return []
297297

298298
try:
299-
return self._redis_conn.scan_iter(f"{self.stream_key_prefix}:*")
299+
# Use match parameter and decode byte strings to regular strings
300+
stream_keys = [
301+
key.decode("utf-8") if isinstance(key, bytes) else key
302+
for key in self._redis_conn.scan_iter(match=f"{self.stream_key_prefix}:*")
303+
]
304+
logger.debug(f"get stream_keys from redis: {stream_keys}")
305+
return stream_keys
300306
except Exception as e:
301307
logger.error(f"Failed to list Redis stream keys: {e}")
302308
return []

src/memos/mem_scheduler/task_schedule_modules/task_queue.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,22 @@ def __init__(
3535

3636
self.disabled_handlers = disabled_handlers
3737

38+
def ack_message(
39+
self,
40+
user_id,
41+
mem_cube_id,
42+
redis_message_id,
43+
) -> None:
44+
if not isinstance(self.memos_message_queue, SchedulerRedisQueue):
45+
logger.warning("ack_message is only supported for Redis queues")
46+
return
47+
48+
self.memos_message_queue.ack_message(
49+
user_id=user_id,
50+
mem_cube_id=mem_cube_id,
51+
redis_message_id=redis_message_id,
52+
)
53+
3854
def debug_mode_on(self):
3955
self.memos_message_queue.stream_key_prefix = (
4056
f"debug_mode:{self.memos_message_queue.stream_key_prefix}"

0 commit comments

Comments
 (0)