Skip to content

Commit 7c4db5c

Browse files
committed
refactor: revise _submit_web_logs to address log missing issue
1 parent 32af4eb commit 7c4db5c

File tree

2 files changed

+23
-23
lines changed

2 files changed

+23
-23
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -838,27 +838,26 @@ def _submit_web_logs(
838838
Args:
839839
messages: Single log message or list of log messages
840840
"""
841-
messages_list = [messages] if isinstance(messages, ScheduleLogForWebItem) else messages
842-
for message in messages_list:
841+
if isinstance(messages, ScheduleLogForWebItem):
842+
messages = [messages] # transform single message to list
843+
844+
for message in messages:
843845
logger.info(
844846
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
845847
)
848+
846849
if self.rabbitmq_config is None:
847850
logger.info(
848851
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
849852
)
850853
return
851854

852-
if isinstance(messages, ScheduleLogForWebItem):
853-
messages = [messages] # transform single message to list
854-
855855
for message in messages:
856-
if not isinstance(message, ScheduleLogForWebItem):
857-
error_msg = f"Invalid message type: {type(message)}, expected ScheduleLogForWebItem"
858-
logger.error(error_msg)
859-
raise TypeError(error_msg)
856+
try:
857+
self._web_log_message_queue.put(message)
858+
except Exception as e:
859+
logger.warning(f"Failed to put message to web log queue: {e}", stack_info=True)
860860

861-
self._web_log_message_queue.put(message)
862861
message_info = message.debug_info()
863862
logger.debug(f"Submitted Scheduling log for web: {message_info}")
864863

src/memos/mem_scheduler/general_modules/misc.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -217,19 +217,20 @@ def put(self, item: T, block: bool = False, timeout: float | None = None) -> Non
217217
block: Ignored (kept for compatibility with Queue interface)
218218
timeout: Ignored (kept for compatibility with Queue interface)
219219
"""
220-
try:
221-
# First try non-blocking put
222-
super().put(item, block=block, timeout=timeout)
223-
except Full:
224-
# Remove the oldest item and mark it done to avoid leaking unfinished_tasks
225-
with suppress(Empty):
226-
_ = self.get_nowait()
227-
# If the removed item had previously incremented unfinished_tasks,
228-
# we must decrement here since it will never be processed.
229-
with suppress(ValueError):
230-
self.task_done()
231-
# Retry putting the new item
232-
super().put(item, block=block, timeout=timeout)
220+
while True:
221+
try:
222+
# First try non-blocking put
223+
super().put(item, block=block, timeout=timeout)
224+
return
225+
except Full:
226+
# Remove the oldest item and mark it done to avoid leaking unfinished_tasks
227+
with suppress(Empty):
228+
_ = self.get_nowait()
229+
# If the removed item had previously incremented unfinished_tasks,
230+
# we must decrement here since it will never be processed.
231+
with suppress(ValueError):
232+
self.task_done()
233+
# Continue loop to retry putting the item
233234

234235
def get(
235236
self, block: bool = True, timeout: float | None = None, batch_size: int | None = None

0 commit comments

Comments
 (0)