Skip to content

Commit adff41f

Browse files
Add pending metrics and age filter to scheduler allstatus
1 parent 3bc37ff commit adff41f

File tree

3 files changed

+32
-5
lines changed

3 files changed

+32
-5
lines changed

src/memos/api/handlers/scheduler_handler.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import json
9+
from datetime import datetime, timezone
910
import time
1011
import traceback
1112
from collections import Counter
@@ -59,18 +60,22 @@ def _summarize_tasks(task_details: list[dict[str, Any]]) -> TaskSummary:
5960
waiting=counter.get("waiting", 0),
6061
in_progress=counter.get("in_progress", 0),
6162
completed=counter.get("completed", 0),
63+
pending=counter.get("pending", counter.get("in_progress", 0)),
6264
failed=counter.get("failed", 0),
6365
cancelled=counter.get("cancelled", 0),
6466
total=total,
6567
)
6668

67-
def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | None:
69+
def _aggregate_counts_from_redis(
70+
tracker: TaskStatusTracker, max_age_seconds: float = 86400
71+
) -> TaskSummary | None:
6872
"""Stream status counts directly from Redis to avoid loading all task payloads."""
6973
redis_client = getattr(tracker, "redis", None)
7074
if not redis_client:
7175
return None
7276

7377
counter = Counter()
78+
now = datetime.now(timezone.utc).timestamp()
7479

7580
# Scan task_meta keys, then hscan each hash in batches
7681
cursor: int | str = 0
@@ -83,6 +88,16 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No
8388
for value in fields.values():
8489
try:
8590
payload = json.loads(value.decode("utf-8") if isinstance(value, bytes) else value)
91+
# Skip stale entries to reduce noise and load
92+
ts = payload.get("submitted_at") or payload.get("started_at")
93+
if ts:
94+
try:
95+
ts_dt = datetime.fromisoformat(ts)
96+
ts_seconds = ts_dt.timestamp()
97+
except Exception:
98+
ts_seconds = None
99+
if ts_seconds and (now - ts_seconds) > max_age_seconds:
100+
continue
86101
status = payload.get("status")
87102
if status:
88103
counter[status] += 1
@@ -101,6 +116,7 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No
101116
waiting=counter.get("waiting", 0),
102117
in_progress=counter.get("in_progress", 0),
103118
completed=counter.get("completed", 0),
119+
pending=counter.get("pending", counter.get("in_progress", 0)),
104120
failed=counter.get("failed", 0),
105121
cancelled=counter.get("cancelled", 0),
106122
total=total,
@@ -120,6 +136,7 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No
120136
# Scheduler view: assume tracker contains scheduler tasks; overlay queue monitor for live queue depth
121137
sched_waiting = all_tasks_summary.waiting
122138
sched_in_progress = all_tasks_summary.in_progress
139+
sched_pending = all_tasks_summary.pending
123140
sched_completed = all_tasks_summary.completed
124141
sched_failed = all_tasks_summary.failed
125142
sched_cancelled = all_tasks_summary.cancelled
@@ -129,17 +146,21 @@ def _aggregate_counts_from_redis(tracker: TaskStatusTracker) -> TaskSummary | No
129146
queue_status_data = mem_scheduler.task_schedule_monitor.get_tasks_status() or {}
130147
scheduler_waiting = 0
131148
scheduler_in_progress = 0
149+
scheduler_pending = 0
132150
for key, value in queue_status_data.items():
133151
if not key.startswith("scheduler:"):
134152
continue
135153
scheduler_in_progress += int(value.get("running", 0) or 0)
154+
scheduler_pending += int(value.get("pending", value.get("running", 0)) or 0)
136155
scheduler_waiting += int(value.get("remaining", 0) or 0)
137156
sched_waiting = scheduler_waiting
138157
sched_in_progress = scheduler_in_progress
158+
sched_pending = scheduler_pending
139159

140160
scheduler_summary = TaskSummary(
141161
waiting=sched_waiting,
142162
in_progress=sched_in_progress,
163+
pending=sched_pending,
143164
completed=sched_completed,
144165
failed=sched_failed,
145166
cancelled=sched_cancelled,

src/memos/api/product_models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,7 @@ class TaskSummary(BaseModel):
872872

873873
waiting: int = Field(0, description="Number of tasks waiting to run")
874874
in_progress: int = Field(0, description="Number of tasks currently running")
875+
pending: int = Field(0, description="Number of tasks fetched by workers but not yet acknowledged")
875876
completed: int = Field(0, description="Number of tasks completed")
876877
failed: int = Field(0, description="Number of tasks failed")
877878
cancelled: int = Field(0, description="Number of tasks cancelled")

src/memos/mem_scheduler/monitors/task_schedule_monitor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(
2929

3030
@staticmethod
3131
def init_task_status() -> dict:
32-
return {"running": 0, "remaining": 0}
32+
return {"running": 0, "remaining": 0, "pending": 0}
3333

3434
def get_tasks_status(self) -> dict:
3535
if isinstance(self.queue, SchedulerRedisQueue):
@@ -158,6 +158,7 @@ def _get_local_tasks_status(self) -> dict:
158158
# running from dispatcher if available
159159
if self.dispatcher and hasattr(self.dispatcher, "get_running_task_count"):
160160
task_status["running"] = int(self.dispatcher.get_running_task_count())
161+
task_status["pending"] = task_status["running"]
161162
except Exception as e:
162163
logger.warning(f"Failed to collect local queue status: {e}")
163164
return task_status
@@ -200,11 +201,13 @@ async def _collect_async() -> dict:
200201
if group.get("name") == self.queue.consumer_group:
201202
pending = int(group.get("pending", 0))
202203
break
203-
# Remaining = total messages (xlen) - pending for our group
204-
remaining = max(0, int(xlen_val or 0))
204+
total_messages = max(0, int(xlen_val or 0))
205+
remaining = max(0, total_messages - pending)
205206
local[stream_key]["running"] += pending
207+
local[stream_key]["pending"] += pending
206208
local[stream_key]["remaining"] += remaining
207209
local["running"] += pending
210+
local["pending"] += pending
208211
local["remaining"] += remaining
209212
return local
210213

@@ -234,10 +237,12 @@ async def _collect_async() -> dict:
234237
for group in groups_info:
235238
if group.get("name") == self.queue.consumer_group:
236239
pending = int(group.get("pending", 0))
237-
remaining = max(0, xlen_val)
240+
remaining = max(0, xlen_val - pending)
238241
task_status[stream_key]["running"] += pending
242+
task_status[stream_key]["pending"] += pending
239243
task_status[stream_key]["remaining"] += remaining
240244
task_status["running"] += pending
245+
task_status["pending"] += pending
241246
task_status["remaining"] += remaining
242247
break
243248

0 commit comments

Comments
 (0)