Skip to content

Commit c8d4925

Browse files
fix(ci): Reformat code to comply with ruff standards
1 parent e4390a3 commit c8d4925

File tree

8 files changed

+68
-42
lines changed

8 files changed

+68
-42
lines changed

examples/mem_scheduler/api_w_scheduler.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
2727
print(f"My test handler received {len(messages)} messages:")
2828
for msg in messages:
2929
print(f" my_test_handler - {msg.item_id}: {msg.content}")
30-
user_status_running = handle_scheduler_status(user_id=msg.user_id, status_tracker=status_tracker)
30+
user_status_running = handle_scheduler_status(
31+
user_id=msg.user_id, status_tracker=status_tracker
32+
)
3133
print("[Monitor] Status after submit:", user_status_running)
3234

3335

@@ -37,9 +39,7 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
3739
mem_scheduler.register_handlers({TEST_HANDLER_LABEL: my_test_handler})
3840

3941
# 2.1 Monitor global scheduler status before submitting tasks
40-
global_status_before = handle_scheduler_status(
41-
user_id=TEST_USER_ID, status_tracker=status_tracker
42-
)
42+
global_status_before = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
4343
print("[Monitor] Global status before submit:", global_status_before)
4444

4545
# 3. Create messages
@@ -78,9 +78,7 @@ def my_test_handler(messages: list[ScheduleMessageItem]):
7878
print(f"[Monitor] Wait result for {USER_MEM_CUBE}:", wait_result)
7979

8080
# 6.2 Monitor global scheduler status after processing
81-
global_status_after = handle_scheduler_status(
82-
user_id=TEST_USER_ID, status_tracker=status_tracker
83-
)
81+
global_status_after = handle_scheduler_status(user_id=TEST_USER_ID, status_tracker=status_tracker)
8482
print("[Monitor] Global status after processing:", global_status_after)
8583

8684
# 7. Stop the scheduler

src/memos/api/handlers/component_init.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,17 @@ def init_server() -> dict[str, Any]:
131131
# Initialize Redis client first as it is a core dependency for features like scheduler status tracking
132132
try:
133133
from memos.mem_scheduler.orm_modules.api_redis_model import APIRedisDBManager
134+
134135
redis_client = APIRedisDBManager.load_redis_engine_from_env()
135136
if redis_client:
136137
logger.info("Redis client initialized successfully.")
137138
else:
138-
logger.error("Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables.")
139+
logger.error(
140+
"Failed to initialize Redis client. Check REDIS_HOST etc. in environment variables."
141+
)
139142
except Exception as e:
140143
logger.error(f"Failed to initialize Redis client: {e}", exc_info=True)
141-
redis_client = None # Ensure redis_client exists even on failure
144+
redis_client = None # Ensure redis_client exists even on failure
142145

143146
# Get default cube configuration
144147
default_cube_config = APIConfig.get_default_cube_config()

src/memos/api/routers/server_router.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,12 @@ def add_memories(add_req: APIADDRequest):
112112
# =============================================================================
113113

114114

115-
@router.get( # Changed from post to get
115+
@router.get( # Changed from post to get
116116
"/scheduler/status", summary="Get scheduler running status", response_model=StatusResponse
117117
)
118118
def scheduler_status(
119119
user_id: str = Query(..., description="User ID"),
120-
task_id: str | None = Query(None, description="Optional Task ID to query a specific task")
120+
task_id: str | None = Query(None, description="Optional Task ID to query a specific task"),
121121
):
122122
"""Get scheduler running status."""
123123
return handlers.scheduler_handler.handle_scheduler_status(

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -731,14 +731,14 @@ def _monitor_loop(self):
731731
if len(parts) >= 3:
732732
user_id = parts[2]
733733
self.metrics.update_queue_length(queue_length, user_id)
734-
elif not self.use_redis_queue: # local queue
735-
user_id = stream_key
736-
self.metrics.update_queue_length(queue_length, user_id)
734+
elif not self.use_redis_queue: # local queue
735+
user_id = stream_key
736+
self.metrics.update_queue_length(queue_length, user_id)
737737

738738
except Exception as e:
739739
logger.error(f"Error in metrics monitor loop: {e}", exc_info=True)
740740

741-
time.sleep(15) # 每 15 秒采样一次
741+
time.sleep(15) # 每 15 秒采样一次
742742

743743
def start(self) -> None:
744744
"""
@@ -760,7 +760,9 @@ def start(self) -> None:
760760
def start_background_monitor(self):
761761
if self._monitor_thread and self._monitor_thread.is_alive():
762762
return
763-
self._monitor_thread = ContextThread(target=self._monitor_loop, daemon=True, name="SchedulerMetricsMonitor")
763+
self._monitor_thread = ContextThread(
764+
target=self._monitor_loop, daemon=True, name="SchedulerMetricsMonitor"
765+
)
764766
self._monitor_thread.start()
765767
logger.info("Scheduler metrics monitor thread started.")
766768

src/memos/mem_scheduler/schemas/message_schemas.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
141141
)
142142
memcube_name: str | None = Field(default=None, description="Display name for memcube")
143143
memory_len: int | None = Field(default=None, description="Count of items involved in the event")
144-
status: str | None = Field(default=None, description="Completion status of the task (e.g., 'completed', 'failed')")
144+
status: str | None = Field(
145+
default=None, description="Completion status of the task (e.g., 'completed', 'failed')"
146+
)
145147

146148
def debug_info(self) -> dict[str, Any]:
147149
"""Return structured debug information for logging purposes."""

src/memos/mem_scheduler/task_schedule_modules/dispatcher.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(
4646
config=None,
4747
status_tracker: TaskStatusTracker | None = None,
4848
metrics: Any | None = None,
49-
submit_web_logs: Callable | None = None, # ADDED
49+
submit_web_logs: Callable | None = None, # ADDED
5050
):
5151
super().__init__()
5252
self.config = config
@@ -98,7 +98,7 @@ def __init__(
9898

9999
self.metrics = metrics
100100
self.status_tracker = status_tracker
101-
self.submit_web_logs = submit_web_logs # ADDED
101+
self.submit_web_logs = submit_web_logs # ADDED
102102

103103
def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None:
104104
if not msgs:
@@ -120,11 +120,13 @@ def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem):
120120
def wrapped_handler(messages: list[ScheduleMessageItem]):
121121
start_time = time.time()
122122
if self.status_tracker:
123-
self.status_tracker.task_started(task_id=task_item.item_id, user_id=task_item.user_id)
123+
self.status_tracker.task_started(
124+
task_id=task_item.item_id, user_id=task_item.user_id
125+
)
124126
try:
125127
# --- mark start: record queuing time(now - enqueue_ts)---
126128
now = time.time()
127-
m = messages[0] # All messages in this batch have same user and type
129+
m = messages[0] # All messages in this batch have same user and type
128130
enq_ts = getattr(m, "timestamp", None)
129131

130132
# Path 1: epoch seconds (preferred)
@@ -145,26 +147,29 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
145147
wait_sec = max(0.0, now - enq_epoch)
146148
self.metrics.observe_task_wait_duration(wait_sec, m.user_id, m.label)
147149

148-
149150
# Execute the original handler
150151
result = handler(messages)
151152

152153
# --- mark done ---
153154
duration = time.time() - start_time
154155
self.metrics.observe_task_duration(duration, m.user_id, m.label)
155156
if self.status_tracker:
156-
self.status_tracker.task_completed(task_id=task_item.item_id, user_id=task_item.user_id)
157+
self.status_tracker.task_completed(
158+
task_id=task_item.item_id, user_id=task_item.user_id
159+
)
157160
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)
158161

159-
is_cloud_env = os.getenv('MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME') == 'memos-memory-change'
162+
is_cloud_env = (
163+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
164+
)
160165
if self.submit_web_logs and is_cloud_env:
161166
status_log = ScheduleLogForWebItem(
162167
user_id=task_item.user_id,
163168
mem_cube_id=task_item.mem_cube_id,
164169
item_id=task_item.item_id,
165170
label=m.label,
166171
log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.",
167-
status="completed"
172+
status="completed",
168173
)
169174
self.submit_web_logs([status_log])
170175

@@ -194,7 +199,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
194199
m = messages[0]
195200
self.metrics.task_failed(m.user_id, m.label, type(e).__name__)
196201
if self.status_tracker:
197-
self.status_tracker.task_failed(task_id=task_item.item_id, user_id=task_item.user_id, error_message=str(e))
202+
self.status_tracker.task_failed(
203+
task_id=task_item.item_id, user_id=task_item.user_id, error_message=str(e)
204+
)
198205
# Mark task as failed and remove from tracking
199206
with self._task_lock:
200207
if task_item.item_id in self._running_tasks:
@@ -204,7 +211,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
204211
self._completed_tasks.pop(0)
205212
logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}")
206213

207-
is_cloud_env = os.getenv('MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME') == 'memos-memory-change'
214+
is_cloud_env = (
215+
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
216+
)
208217
if self.submit_web_logs and is_cloud_env:
209218
status_log = ScheduleLogForWebItem(
210219
user_id=task_item.user_id,
@@ -213,7 +222,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
213222
label=m.label,
214223
log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.",
215224
status="failed",
216-
exception=str(e)
225+
exception=str(e),
217226
)
218227
self.submit_web_logs([status_log])
219228
raise

src/memos/mem_scheduler/utils/metrics.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,31 +11,31 @@
1111
TASKS_ENQUEUED_TOTAL = Counter(
1212
"memos_scheduler_tasks_enqueued_total",
1313
"Total number of tasks enqueued",
14-
["user_id", "task_type"]
14+
["user_id", "task_type"],
1515
)
1616

1717
TASKS_DEQUEUED_TOTAL = Counter(
1818
"memos_scheduler_tasks_dequeued_total",
1919
"Total number of tasks dequeued",
20-
["user_id", "task_type"]
20+
["user_id", "task_type"],
2121
)
2222

2323
TASK_DURATION_SECONDS = Summary(
2424
"memos_scheduler_task_duration_seconds",
2525
"Task processing duration in seconds",
26-
["user_id", "task_type"]
26+
["user_id", "task_type"],
2727
)
2828

2929
TASK_WAIT_DURATION_SECONDS = Summary(
3030
"memos_scheduler_task_wait_duration_seconds",
3131
"Task waiting duration in seconds",
32-
["user_id", "task_type"]
32+
["user_id", "task_type"],
3333
)
3434

3535
TASKS_FAILED_TOTAL = Counter(
3636
"memos_scheduler_tasks_failed_total",
3737
"Total number of failed tasks",
38-
["user_id", "task_type", "error_type"]
38+
["user_id", "task_type", "error_type"],
3939
)
4040

4141
TASKS_COMPLETED_TOTAL = Counter(
@@ -45,47 +45,56 @@
4545
)
4646

4747
QUEUE_LENGTH = Gauge(
48-
"memos_scheduler_queue_length",
49-
"Current length of the task queue",
50-
["user_id"]
48+
"memos_scheduler_queue_length", "Current length of the task queue", ["user_id"]
5149
)
5250

5351
INTERNAL_SPAN_DURATION = Histogram(
54-
'memos_scheduler_internal_span_duration_seconds',
55-
'Duration of internal operations',
56-
['span_name', 'user_id', 'task_id']
52+
"memos_scheduler_internal_span_duration_seconds",
53+
"Duration of internal operations",
54+
["span_name", "user_id", "task_id"],
5755
)
5856

5957

6058
# --- Instrumentation Functions ---
6159

60+
6261
def task_enqueued(user_id: str, task_type: str, count: int = 1):
6362
TASKS_ENQUEUED_TOTAL.labels(user_id=user_id, task_type=task_type).inc(count)
6463

64+
6565
def task_dequeued(user_id: str, task_type: str, count: int = 1):
6666
TASKS_DEQUEUED_TOTAL.labels(user_id=user_id, task_type=task_type).inc(count)
6767

68+
6869
def observe_task_duration(duration: float, user_id: str, task_type: str):
6970
TASK_DURATION_SECONDS.labels(user_id=user_id, task_type=task_type).observe(duration)
7071

72+
7173
def observe_task_wait_duration(duration: float, user_id: str, task_type: str):
7274
TASK_WAIT_DURATION_SECONDS.labels(user_id=user_id, task_type=task_type).observe(duration)
7375

76+
7477
def task_failed(user_id: str, task_type: str, error_type: str):
7578
TASKS_FAILED_TOTAL.labels(user_id=user_id, task_type=task_type, error_type=error_type).inc()
7679

80+
7781
def task_completed(user_id: str, task_type: str, count: int = 1):
7882
TASKS_COMPLETED_TOTAL.labels(user_id=user_id, task_type=task_type).inc(count)
7983

84+
8085
def update_queue_length(length: int, user_id: str):
8186
QUEUE_LENGTH.labels(user_id=user_id).set(length)
8287

88+
8389
def observe_internal_span(duration: float, span_name: str, user_id: str, task_id: str):
84-
INTERNAL_SPAN_DURATION.labels(span_name=span_name, user_id=user_id, task_id=task_id).observe(duration)
90+
INTERNAL_SPAN_DURATION.labels(span_name=span_name, user_id=user_id, task_id=task_id).observe(
91+
duration
92+
)
8593

8694

8795
# --- TimingSpan Context Manager ---
8896

97+
8998
class TimingSpan(ContextDecorator):
9099
"""
91100
A context manager/decorator to measure the duration of a code block and record it
@@ -100,6 +109,7 @@ def my_function():
100109
with TimingSpan("another_op", user_id="user456", task_id="t1"):
101110
...
102111
"""
112+
103113
def __init__(self, span_name: str, user_id: str = "unknown", task_id: str = "unknown"):
104114
self.span_name = span_name
105115
self.user_id = user_id
@@ -112,4 +122,4 @@ def __enter__(self):
112122

113123
def __exit__(self, exc_type, exc_val, exc_tb):
114124
duration = time.perf_counter() - self.start_time
115-
observe_internal_span(duration, self.span_name, self.user_id, self.task_id)
125+
observe_internal_span(duration, self.span_name, self.user_id, self.task_id)

tests/mem_scheduler/test_dispatcher.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ def test_register_handlers(self):
156156
def test_dispatch_serial(self):
157157
"""Test dispatching messages in serial mode."""
158158
# Create a new dispatcher with parallel dispatch disabled
159-
serial_dispatcher = SchedulerDispatcher(max_workers=2, enable_parallel_dispatch=False, metrics=MagicMock())
159+
serial_dispatcher = SchedulerDispatcher(
160+
max_workers=2, enable_parallel_dispatch=False, metrics=MagicMock()
161+
)
160162

161163
# Create fresh mock handlers for this test
162164
mock_handler1 = MagicMock()

0 commit comments

Comments
 (0)