Skip to content

Commit 53a131c

Browse files
Handle RabbitMQ publish when offline and avoid duplicate init
1 parent 35bc424 commit 53a131c

File tree

2 files changed

+80
-7
lines changed

2 files changed

+80
-7
lines changed

src/memos/mem_scheduler/base_scheduler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,9 @@ def _submit_web_logs(
844844
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
845845
)
846846
if self.rabbitmq_config is None:
847+
logger.info(
848+
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
849+
)
847850
return
848851

849852
if isinstance(messages, ScheduleLogForWebItem):
@@ -859,9 +862,9 @@ def _submit_web_logs(
859862
message_info = message.debug_info()
860863
logger.debug(f"Submitted Scheduling log for web: {message_info}")
861864

862-
if self.is_rabbitmq_connected():
863-
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
864-
self.rabbitmq_publish_message(message=message.to_dict())
865+
# Always call publish; the publisher now caches when offline and flushes after reconnect
866+
logger.info(f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}")
867+
self.rabbitmq_publish_message(message=message.to_dict())
865868
logger.debug(
866869
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
867870
)

src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import threading
55
import time
66

7+
from queue import Empty
78
from pathlib import Path
89

910
from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
@@ -44,6 +45,11 @@ def __init__(self):
4445
self.rabbitmq_message_cache = AutoDroppingQueue(
4546
maxsize=self.rabbitmq_message_cache_max_size
4647
)
48+
# Pending outgoing messages to avoid loss when connection is not ready
49+
self.rabbitmq_publish_cache_max_size = 50
50+
self.rabbitmq_publish_cache = AutoDroppingQueue(
51+
maxsize=self.rabbitmq_publish_cache_max_size
52+
)
4753
self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure
4854
self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries
4955
self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio
@@ -54,6 +60,7 @@ def __init__(self):
5460
self._rabbitmq_io_loop_thread = None # For IOLoop execution
5561
self._rabbitmq_stop_flag = False # Graceful shutdown flag
5662
self._rabbitmq_lock = threading.Lock() # Ensure thread safety
63+
self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations
5764

5865
def is_rabbitmq_connected(self) -> bool:
5966
"""Check if RabbitMQ connection is alive"""
@@ -70,11 +77,20 @@ def initialize_rabbitmq(
7077
"""
7178
Establish connection to RabbitMQ using pika.
7279
"""
80+
with self._rabbitmq_lock:
81+
if self._rabbitmq_initializing:
82+
logger.info("[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call.")
83+
return
84+
self._rabbitmq_initializing = True
7385
try:
7486
# Skip remote initialization in CI/pytest unless explicitly enabled
7587
enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true"
7688
in_ci = os.getenv("CI", "").lower() == "true"
7789
in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None
90+
logger.info(
91+
f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, "
92+
f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}"
93+
)
7894
if (in_ci or in_pytest) and not enable_env:
7995
logger.info(
8096
"Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable."
@@ -131,6 +147,9 @@ def initialize_rabbitmq(
131147
logger.info("RabbitMQ connection process started")
132148
except Exception:
133149
logger.error("Fail to initialize auth_config", exc_info=True)
150+
finally:
151+
with self._rabbitmq_lock:
152+
self._rabbitmq_initializing = False
134153

135154
def get_rabbitmq_queue_size(self) -> int:
136155
"""Get the current number of messages in the queue.
@@ -197,7 +216,7 @@ def get_rabbitmq_connection_param(self):
197216
# Connection lifecycle callbacks
198217
def on_rabbitmq_connection_open(self, connection):
199218
"""Called when connection is established."""
200-
logger.debug("Connection opened")
219+
logger.info("[DIAGNOSTIC] RabbitMQ connection opened")
201220
connection.channel(on_open_callback=self.on_rabbitmq_channel_open)
202221

203222
def on_rabbitmq_connection_error(self, connection, error):
@@ -215,7 +234,7 @@ def on_rabbitmq_connection_closed(self, connection, reason):
215234
def on_rabbitmq_channel_open(self, channel):
216235
"""Called when channel is ready."""
217236
self.rabbitmq_channel = channel
218-
logger.debug("Channel opened")
237+
logger.info("[DIAGNOSTIC] RabbitMQ channel opened")
219238

220239
# Setup exchange and queue
221240
channel.exchange_declare(
@@ -243,6 +262,8 @@ def on_rabbitmq_queue_declared(self, frame):
243262
def on_rabbitmq_bind_ok(self, frame):
244263
"""Final setup step when bind is complete."""
245264
logger.info("RabbitMQ setup completed")
265+
# Flush any cached publish messages now that connection is ready
266+
self._flush_cached_publish_messages()
246267

247268
def on_rabbitmq_message(self, channel, method, properties, body):
248269
"""Handle incoming messages. Only for test."""
@@ -311,11 +332,24 @@ def rabbitmq_publish_message(self, message: dict):
311332
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")
312333

313334
with self._rabbitmq_lock:
335+
logger.info(
336+
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. "
337+
f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, "
338+
f"routing_key='{routing_key}', label={label}"
339+
)
314340
if not self.is_rabbitmq_connected():
315-
logger.error("Cannot publish - no active connection")
341+
logger.error(
342+
"[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. "
343+
f"connection_exists={bool(self.rabbitmq_connection)}, "
344+
f"channel_exists={bool(self.rabbitmq_channel)}, "
345+
f"config_loaded={self.rabbitmq_config is not None}"
346+
)
347+
self.rabbitmq_publish_cache.put(message)
348+
# Best-effort to connect
349+
self.initialize_rabbitmq(config=self.rabbitmq_config)
316350
return False
317351

318-
logger.info(
352+
logger.warning(
319353
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message: Attempting to publish message. Exchange: {exchange_name}, Routing Key: {routing_key}, Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}"
320354
)
321355
try:
@@ -332,6 +366,8 @@ def rabbitmq_publish_message(self, message: dict):
332366
return True
333367
except Exception as e:
334368
logger.error(f"Failed to publish message: {e}")
369+
# Cache message for retry on next connection
370+
self.rabbitmq_publish_cache.put(message)
335371
self.rabbit_reconnect()
336372
return False
337373

@@ -379,3 +415,37 @@ def rabbitmq_close(self):
379415
logger.warning("IOLoop thread did not terminate cleanly")
380416

381417
logger.info("RabbitMQ connection closed")
418+
419+
def _flush_cached_publish_messages(self):
420+
"""Flush cached outgoing messages once connection is available."""
421+
if self.rabbitmq_publish_cache.empty():
422+
return
423+
424+
if not self.is_rabbitmq_connected():
425+
logger.info(
426+
"[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; "
427+
f"pending={self.rabbitmq_publish_cache.qsize()}"
428+
)
429+
return
430+
431+
drained: list[dict] = []
432+
while True:
433+
try:
434+
drained.append(self.rabbitmq_publish_cache.get_nowait())
435+
except Empty:
436+
break
437+
438+
if not drained:
439+
return
440+
441+
logger.info(
442+
f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect."
443+
)
444+
for cached_msg in drained:
445+
success = self.rabbitmq_publish_message(cached_msg)
446+
if not success:
447+
# Message already re-cached inside publish; avoid tight loop
448+
logger.error(
449+
"[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt."
450+
)
451+
break

0 commit comments

Comments
 (0)