Skip to content
38 changes: 17 additions & 21 deletions src/memos/mem_scheduler/task_schedule_modules/redis_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,27 +699,23 @@ def _batch_claim_pending_messages(
results = []
try:
results = pipe.execute()
except Exception as e:
err_msg = str(e).lower()
if "nogroup" in err_msg or "no such key" in err_msg:
# Fallback: attempt sequential xautoclaim for robustness
for stream_key, need_count, label in claims_spec:
try:
self._ensure_consumer_group(stream_key=stream_key)
res = self._redis_conn.xautoclaim(
name=stream_key,
groupname=self.consumer_group,
consumername=self.consumer_name,
min_idle_time=self.orchestrator.get_task_idle_min(task_label=label),
start_id="0-0",
count=need_count,
justid=False,
)
results.append(res)
except Exception:
continue
else:
logger.error(f"Pipeline xautoclaim failed: {e}")
except Exception:
# Fallback: attempt sequential xautoclaim for robustness
for stream_key, need_count, label in claims_spec:
try:
self._ensure_consumer_group(stream_key=stream_key)
res = self._redis_conn.xautoclaim(
name=stream_key,
groupname=self.consumer_group,
consumername=self.consumer_name,
min_idle_time=self.orchestrator.get_task_idle_min(task_label=label),
start_id="0-0",
count=need_count,
justid=False,
)
results.append(res)
except Exception:
continue

claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = []
for (stream_key, _need_count, _label), claimed_result in zip(
Expand Down